Class: Fanforce::Worker::Runner
- Inherits:
-
Object
- Object
- Fanforce::Worker::Runner
show all
- Includes:
- Utils
- Defined in:
- lib/fanforce/worker/runner.rb
Defined Under Namespace
Classes: Timeout
Constant Summary
collapse
- MAX_EXECUTION_TIME =
3300
Instance Method Summary
collapse
Methods included from Utils
included, #iron_queue_id, #log
Constructor Details
#initialize(worker_data, min_execution_time = 300, &code_block) ⇒ Runner
Returns a new instance of Runner.
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
# File 'lib/fanforce/worker/runner.rb', line 11
def initialize(worker_data, min_execution_time=300, &code_block)
raise "min_execution_time was set to #{min_execution_time}, which is #{min_execution_time - MAX_EXECUTION_TIME} seconds too long" if min_execution_time > MAX_EXECUTION_TIME
log.debug '------------------------------------------------------------------------------------'
log.debug 'LOADING WORKER ENV'
@queue_id = worker_data['queue_id'] || (raise 'worker_data must contain queue_id')
@worker_env = worker_data['env_vars'] || {}
@min_execution_time = min_execution_time
@code_block = code_block
load_env_from_server
load_env_from_worker(@worker_env)
ENV.each {|k,v| log.debug "#{k} = #{v}" }
load_jobs
end
|
Instance Method Details
#delete_job(job = nil) ⇒ Object
107
108
109
110
111
112
113
114
|
# File 'lib/fanforce/worker/runner.rb', line 107
def delete_job(job=nil)
return if job.nil? and @current_job.nil?
(job || @current_job).delete
rescue Exception => e
log.debug "Job could not be deleted: #{e.message}"
ensure
@current_job = nil
end
|
#handle_job_error(e, job, job_data) ⇒ Object
86
87
88
89
90
91
92
93
|
# File 'lib/fanforce/worker/runner.rb', line 86
def handle_job_error(e, job, job_data)
raise($!, "#{$!}: THERE IS NO JOB", $!.backtrace) if job.nil?
delete_job(job)
require_relative 'errors'
log.debug 'REMOVED JOB FROM QUEUE, AND SAVING TO ERROR CACHE...'
Fanforce::Worker::Errors.add(@queue_id, e, job_data, @worker_env)
end
|
#handle_job_loading_error(e, job, job_data) ⇒ Object
78
79
80
81
82
83
84
|
# File 'lib/fanforce/worker/runner.rb', line 78
def handle_job_loading_error(e, job, job_data)
raise($!, "#{$!}: THERE IS NO JOB", $!.backtrace) if job.nil?
delete_job(job)
log.debug 'REMOVED JOB FROM QUEUE, BUT COULD NOT SAVE TO ERROR CACHE...'
raise($!, "#{$!}: #{job_data.to_json}", $!.backtrace)
end
|
#job_has_enough_time_to_run ⇒ Object
100
101
102
103
104
105
|
# File 'lib/fanforce/worker/runner.rb', line 100
def job_has_enough_time_to_run
time_since_load = Time.now - Fanforce::Worker::LOADED_AT
return false if time_since_load > MAX_EXECUTION_TIME
return false if worker_time_remaining < @min_execution_time
return true
end
|
#load_env_from_server ⇒ Object
52
53
54
55
56
57
58
59
60
|
# File 'lib/fanforce/worker/runner.rb', line 52
def load_env_from_server
if File.exists?('.developmentenv.rb')
require '.developmentenv'
elsif File.exists?('.stagingenv.rb')
require '.stagingenv'
elsif File.exists?('.productionenv.rb')
require '.productionenv'
end
end
|
#load_env_from_worker(vars) ⇒ Object
62
63
64
|
# File 'lib/fanforce/worker/runner.rb', line 62
def load_env_from_worker(vars)
vars.each {|k,v| ENV[k.to_s]=v }
end
|
#load_jobs ⇒ Object
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
# File 'lib/fanforce/worker/runner.rb', line 29
def load_jobs
log.debug '------------------------------------------------------------------------------------'
log.debug 'PROCESSING JOBS...'
log.debug '------------------------------------------------------------------------------------'
job_num = 0
job_data = nil
while job_has_enough_time_to_run and (job = Fanforce::Worker.iron_mq.queue(iron_queue_id(@queue_id)).get(timeout: 3600)) do
log.debug "- JOB #{job_num+=1}: #{job.body}"
timeout(worker_time_remaining, Timeout) do
job_data = nil
job_data = Fanforce.decode_json(job.body)
run_job(job, job_data, &@code_block)
end
delete_job
log.debug '------------------------------------------------------------------------------------'
end
delete_job
log.debug 'WINDING DOWN WORKER!'
rescue Exception => e
handle_job_loading_error(e, job, job_data)
end
|
#run_job(job, job_data, &code_block) ⇒ Object
66
67
68
69
70
71
72
73
74
75
76
|
# File 'lib/fanforce/worker/runner.rb', line 66
def run_job(job, job_data, &code_block)
@current_job = job
@current_params = job_data[:params]
@current_retries = job_data[:retries]
code_block.call(job_data[:params].clone, retries: job_data[:retries], queue_id: @queue_id)
delete_job(job)
rescue Exception => e
handle_job_error(e, job, job_data)
end
|
#worker_time_remaining ⇒ Object
95
96
97
98
|
# File 'lib/fanforce/worker/runner.rb', line 95
def worker_time_remaining
time_since_load = Time.now - Fanforce::Worker::LOADED_AT
MAX_EXECUTION_TIME - time_since_load
end
|