Class: Fanforce::Workers
- Inherits:
-
Object
- Object
- Fanforce::Workers
- Defined in:
- lib/fanforce/workers/version.rb,
lib/fanforce/workers/workers.rb
Constant Summary collapse
- VERSION =
'0.9.0'
Class Method Summary collapse
- .add_error(queue_id, error) ⇒ Object
- .current_job ⇒ Object
- .current_job=(job) ⇒ Object
- .current_params ⇒ Object
- .current_params=(params) ⇒ Object
- .current_queue_id ⇒ Object
- .current_queue_id=(queue_id) ⇒ Object
- .current_retries ⇒ Object
- .current_retries=(retries) ⇒ Object
- .current_worker_env ⇒ Object
- .current_worker_env=(env_vars) ⇒ Object
- .delete_job(job = nil) ⇒ Object
- .enqueue(queue_id, params, options = {}) ⇒ Object
- .retry(options) ⇒ Object
- .run(worker_data, &code_block) ⇒ Object
- .run_job(job, &code_block) ⇒ Object
- .set_env_vars(vars) ⇒ Object
Instance Method Summary collapse
- #add_error(queue_id, error) ⇒ Object
- #delete_error(queue_id, job_id, details_id) ⇒ Object
- #enqueue(queue_id, params, options = {}) ⇒ Object
- #error_details(queue_id, details_id) ⇒ Object
-
#initialize(opts = {}) ⇒ Workers
constructor
A new instance of Workers.
- #iron_cache ⇒ Object
- #iron_mq ⇒ Object
- #retry_error(queue_id, job_id, details_id) ⇒ Object
- #truncate(text, length = 130, truncate_string = "...") ⇒ Object
Constructor Details
#initialize(opts = {}) ⇒ Workers
Returns a new instance of Workers.
3 4 5 |
# File 'lib/fanforce/workers/workers.rb', line 3 def initialize(opts={}) @opts = opts end |
Class Method Details
.add_error(queue_id, error) ⇒ Object
86 87 88 |
# File 'lib/fanforce/workers/workers.rb', line 86 def self.add_error(queue_id, error) self.new.add_error(queue_id, error) end |
.current_job ⇒ Object
128 129 130 |
# File 'lib/fanforce/workers/workers.rb', line 128 def self.current_job @current_job end |
.current_job=(job) ⇒ Object
124 125 126 |
# File 'lib/fanforce/workers/workers.rb', line 124 def self.current_job=(job) @current_job = job end |
.current_params ⇒ Object
112 113 114 |
# File 'lib/fanforce/workers/workers.rb', line 112 def self.current_params @current_params end |
.current_params=(params) ⇒ Object
108 109 110 |
# File 'lib/fanforce/workers/workers.rb', line 108 def self.current_params=(params) @current_params = params end |
.current_queue_id ⇒ Object
96 97 98 |
# File 'lib/fanforce/workers/workers.rb', line 96 def self.current_queue_id @current_queue_id end |
.current_queue_id=(queue_id) ⇒ Object
92 93 94 |
# File 'lib/fanforce/workers/workers.rb', line 92 def self.current_queue_id=(queue_id) @current_queue_id = queue_id end |
.current_retries ⇒ Object
120 121 122 |
# File 'lib/fanforce/workers/workers.rb', line 120 def self.current_retries @current_retries end |
.current_retries=(retries) ⇒ Object
116 117 118 |
# File 'lib/fanforce/workers/workers.rb', line 116 def self.current_retries=(retries) @current_retries = retries end |
.current_worker_env ⇒ Object
104 105 106 |
# File 'lib/fanforce/workers/workers.rb', line 104 def self.current_worker_env @current_worker_env end |
.current_worker_env=(env_vars) ⇒ Object
100 101 102 |
# File 'lib/fanforce/workers/workers.rb', line 100 def self.current_worker_env=(env_vars) @current_worker_env = env_vars end |
.delete_job(job = nil) ⇒ Object
192 193 194 195 196 |
# File 'lib/fanforce/workers/workers.rb', line 192 def self.delete_job(job=nil) return if job.nil? and current_job.nil? (job || current_job).delete self.current_job = nil end |
.enqueue(queue_id, params, options = {}) ⇒ Object
82 83 84 |
# File 'lib/fanforce/workers/workers.rb', line 82 def self.enqueue(queue_id, params, ={}) self.new.enqueue(queue_id, params, ) end |
.retry(options) ⇒ Object
154 155 156 |
# File 'lib/fanforce/workers/workers.rb', line 154 def self.retry() self.new.enqueue(current_queue_id, current_params, .merge(retries: current_retries + 1)) end |
.run(worker_data, &code_block) ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/fanforce/workers/workers.rb', line 132 def self.run(worker_data, &code_block) require '.pluginenv' require 'iron_mq' require 'iron_cache' require 'fanforce/api' require 'active_support/all' self.current_queue_id = worker_data['queue_id'] self.current_worker_env = worker_data['env_vars'] queue = IronMQ::Client.new.queue(current_queue_id) job_num = 0 puts 'PROCESSING...' while (job = queue.get(timeout: 3600)) do puts "JOB #{job_num+=1}: #{job.body}" run_job job, &code_block self.delete_job end self.delete_job puts 'DONE' end |
.run_job(job, &code_block) ⇒ Object
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/fanforce/workers/workers.rb', line 158 def self.run_job(job, &code_block) puts '----------------------------------------------------------' print 'PROCESSING MESSAGE: ' task_data = Fanforce.decode_json(job.body) self.current_job = job self.current_params = task_data[:params] self.current_retries = task_data[:retries] set_env_vars(current_worker_env) code_block.call(task_data[:params].clone, retries: task_data[:retries], queue_id: current_queue_id) self.delete_job(job) rescue Exception => e if job.nil? puts 'MESSAGE IS NIL' return end error = task_data.merge( exception: e.class.name, message: e., backtrace: e.backtrace, errored_at: Time.now, env_vars: current_worker_env ) error[:curl_command] = e.curl_command if e.respond_to?(:curl_command) puts "ADDING TO ERROR CACHE: #{error.to_json}" self.delete_job(job) puts 'DELETED MESSAGE' self.add_error current_queue_id, error end |
.set_env_vars(vars) ⇒ Object
198 199 200 |
# File 'lib/fanforce/workers/workers.rb', line 198 def self.set_env_vars(vars) vars.each {|k,v| ENV[k.to_s]=v } end |
Instance Method Details
#add_error(queue_id, error) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/fanforce/workers/workers.rb', line 23 def add_error(queue_id, error) require 'uuidtools' details_id = UUIDTools::UUID.random_create.to_s iron_cache.cache("#{queue_id}-ERRORS").put(details_id, error.to_json) iron_mq.queue("#{queue_id}-ERRORS").post({ details_id: details_id, exception: truncate(error[:exception]), message: truncate(error[:message].to_s), params: truncate(error[:params].to_json), errored_at: error[:errored_at], retries: error[:retries], env_vars: truncate(error[:env_vars].to_json), curl_command: truncate(error[:curl_command].to_s) }.to_json) rescue => e puts '-----------------------------------------------------' puts 'WORKER ERROR WHILE RECOVERING FROM JOB ERROR:' puts e. puts e.backtrace puts '-----------------------------------------------------' puts 'JOB ERROR:' puts "details_id: #{details_id}" puts "exception: #{truncate(error[:exception])}" puts "message: #{truncate(error[:message].to_s)}" puts "params: #{truncate(error[:params].to_json)}" puts "errored_at: #{error[:errored_at]}" puts "retries: #{error[:retries]}" puts "env_vars: #{truncate(error[:env_vars].to_json)}" puts "curl_command: #{truncate(error[:curl_command].to_s)}" end |
#delete_error(queue_id, job_id, details_id) ⇒ Object
54 55 56 57 |
# File 'lib/fanforce/workers/workers.rb', line 54 def delete_error(queue_id, job_id, details_id) iron_mq.queue("#{queue_id}-ERRORS").delete(job_id) iron_cache.cache("#{queue_id}-ERRORS").delete(details_id) end |
#enqueue(queue_id, params, options = {}) ⇒ Object
17 18 19 20 21 |
# File 'lib/fanforce/workers/workers.rb', line 17 def enqueue(queue_id, params, ={}) retries = ([:retries].present?) ? .delete(:retries) : 0 raise 'Params being sent to the queue must be a Hash' if !params.is_a?(Hash) iron_mq.queue(queue_id).post({params: params, retries: retries}.to_json, ) end |
#error_details(queue_id, details_id) ⇒ Object
59 60 61 62 |
# File 'lib/fanforce/workers/workers.rb', line 59 def error_details(queue_id, details_id) cache = iron_cache.cache("#{queue_id}-ERRORS").get(details_id) MultiJson.load(cache.value, :symbolize_keys => true) end |
#iron_cache ⇒ Object
12 13 14 15 |
# File 'lib/fanforce/workers/workers.rb', line 12 def iron_cache require 'iron_cache' @iron_cache ||= IronCache::Client.new(:token => @opts[:token] || ENV['IRON_TOKEN'], :project_id => @opts[:project_id] || ENV['IRON_PROJECT_ID']) end |
#iron_mq ⇒ Object
7 8 9 10 |
# File 'lib/fanforce/workers/workers.rb', line 7 def iron_mq require 'iron_mq' @iron_mq ||= IronMQ::Client.new(:token => @opts[:token] || ENV['IRON_TOKEN'], :project_id => @opts[:project_id] || ENV['IRON_PROJECT_ID']) end |
#retry_error(queue_id, job_id, details_id) ⇒ Object
64 65 66 67 68 69 70 |
# File 'lib/fanforce/workers/workers.rb', line 64 def retry_error(queue_id, job_id, details_id) cache = iron_cache.cache("#{queue_id}-ERRORS").get(details_id) cache_data = MultiJson.load(cache.value, :symbolize_keys => true) enqueue(queue_id, cache_data[:params], :retries => cache_data[:retries] + 1) cache.delete and iron_mq.queue("#{queue_id}-ERRORS").delete(job_id) end |
#truncate(text, length = 130, truncate_string = "...") ⇒ Object
72 73 74 75 76 77 78 |
# File 'lib/fanforce/workers/workers.rb', line 72 def truncate(text, length=130, truncate_string="...") if text l = length - truncate_string.chars.to_a.length chars = text.chars.to_a (chars.length > length ? chars[0...l].join('') + truncate_string : text).to_s end end |