Class: Fanforce::Worker::Runner

Inherits:
Object
  • Object
show all
Includes:
Utils
Defined in:
lib/fanforce/worker/runner.rb

Defined Under Namespace

Classes: Timeout

Constant Summary collapse

MAX_EXECUTION_TIME =
3300

Instance Method Summary collapse

Methods included from Utils

included, #iron_queue_id, #log

Constructor Details

#initialize(worker_data, min_execution_time = 300, &code_block) ⇒ Runner

Returns a new instance of Runner.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/fanforce/worker/runner.rb', line 11

def initialize(worker_data, min_execution_time=300, &code_block)
  raise "min_execution_time was set to #{min_execution_time}, which is #{min_execution_time - MAX_EXECUTION_TIME} seconds too long" if min_execution_time > MAX_EXECUTION_TIME
  log.debug '------------------------------------------------------------------------------------'
  log.debug 'LOADING WORKER ENV'

  @queue_id = worker_data['queue_id'] || (raise 'worker_data must contain queue_id')
  @worker_env = worker_data['env_vars'] || {}

  @min_execution_time = min_execution_time
  @code_block = code_block

  load_env_from_server
  load_env_from_worker(@worker_env)
  ENV.each {|k,v| log.debug "#{k} = #{v}" }

  load_jobs
end

Instance Method Details

#delete_job(job = nil) ⇒ Object



107
108
109
110
111
112
113
114
# File 'lib/fanforce/worker/runner.rb', line 107

def delete_job(job=nil)
  return if job.nil? and @current_job.nil?
  (job || @current_job).delete
rescue Exception => e
  log.debug "Job could not be deleted: #{e.message}"
ensure
  @current_job = nil
end

#handle_job_error(e, job, job_data) ⇒ Object

Raises:

  • ($!)


86
87
88
89
90
91
92
93
# File 'lib/fanforce/worker/runner.rb', line 86

def handle_job_error(e, job, job_data)
  raise($!, "#{$!}: THERE IS NO JOB", $!.backtrace) if job.nil?

  delete_job(job)
  require_relative 'errors'
  log.debug 'REMOVED JOB FROM QUEUE, AND SAVING TO ERROR CACHE...'
  Fanforce::Worker::Errors.add(@queue_id, e, job_data, @worker_env)
end

#handle_job_loading_error(e, job, job_data) ⇒ Object

Raises:

  • ($!)


78
79
80
81
82
83
84
# File 'lib/fanforce/worker/runner.rb', line 78

def handle_job_loading_error(e, job, job_data)
  raise($!, "#{$!}: THERE IS NO JOB", $!.backtrace) if job.nil?

  delete_job(job)
  log.debug 'REMOVED JOB FROM QUEUE, BUT COULD NOT SAVE TO ERROR CACHE...'
  raise($!, "#{$!}: #{job_data.to_json}", $!.backtrace)
end

#job_has_enough_time_to_runObject



100
101
102
103
104
105
# File 'lib/fanforce/worker/runner.rb', line 100

def job_has_enough_time_to_run
  time_since_load = Time.now - Fanforce::Worker::LOADED_AT
  return false if time_since_load > MAX_EXECUTION_TIME
  return false if worker_time_remaining < @min_execution_time
  return true
end

#load_env_from_serverObject



52
53
54
55
56
57
58
59
60
# File 'lib/fanforce/worker/runner.rb', line 52

def load_env_from_server
  if File.exists?('.developmentenv.rb')
    require '.developmentenv'
  elsif File.exists?('.stagingenv.rb')
    require '.stagingenv'
  elsif File.exists?('.productionenv.rb')
    require '.productionenv'
  end
end

#load_env_from_worker(vars) ⇒ Object



62
63
64
# File 'lib/fanforce/worker/runner.rb', line 62

def load_env_from_worker(vars)
  vars.each {|k,v| ENV[k.to_s]=v }
end

#load_jobsObject



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/fanforce/worker/runner.rb', line 29

def load_jobs
  log.debug '------------------------------------------------------------------------------------'
  log.debug 'PROCESSING JOBS...'
  log.debug '------------------------------------------------------------------------------------'
  job_num = 0
  job_data = nil
  while job_has_enough_time_to_run and (job = Fanforce::Worker.iron_mq.queue(iron_queue_id(@queue_id)).get(timeout: 3600)) do
    log.debug "- JOB #{job_num+=1}: #{job.body}"
    timeout(worker_time_remaining, Timeout) do
      job_data = nil
      job_data = Fanforce.decode_json(job.body)
      run_job(job, job_data, &@code_block)
    end
    delete_job
    log.debug '------------------------------------------------------------------------------------'
  end
  delete_job
  log.debug 'WINDING DOWN WORKER!'

rescue Exception => e
  handle_job_loading_error(e, job, job_data)
end

#run_job(job, job_data, &code_block) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
# File 'lib/fanforce/worker/runner.rb', line 66

def run_job(job, job_data, &code_block)
  @current_job = job
  @current_params  = job_data[:params]
  @current_retries = job_data[:retries]

  code_block.call(job_data[:params].clone, retries: job_data[:retries], queue_id: @queue_id)
  delete_job(job)

rescue Exception => e
  handle_job_error(e, job, job_data)
end

#worker_time_remainingObject



95
96
97
98
# File 'lib/fanforce/worker/runner.rb', line 95

def worker_time_remaining
  time_since_load = Time.now - Fanforce::Worker::LOADED_AT
  MAX_EXECUTION_TIME - time_since_load
end