Class: When::Do
- Inherits:
-
Object
- Object
- When::Do
- Defined in:
- lib/when-do/do.rb
Instance Attribute Summary collapse
-
#delayed_queue_key ⇒ Object
readonly
Returns the value of attribute delayed_queue_key.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#opts ⇒ Object
readonly
Returns the value of attribute opts.
-
#pid_file_path ⇒ Object
readonly
Returns the value of attribute pid_file_path.
-
#redis ⇒ Object
readonly
Returns the value of attribute redis.
-
#schedule_key ⇒ Object
readonly
Returns the value of attribute schedule_key.
-
#worker_queue_key ⇒ Object
readonly
Returns the value of attribute worker_queue_key.
Instance Method Summary collapse
- #analyze(started_at) ⇒ Object
- #analyze_dst(started_at) ⇒ Object
- #analyze_in_child_process ⇒ Object
- #build_day_key(started_at) ⇒ Object
- #build_min_key(started_at) ⇒ Object
- #dst_forward?(started_at) ⇒ Boolean
- #enqueue(jobs) ⇒ Object
-
#initialize(opts = {}) ⇒ Do
constructor
A new instance of Do.
- #queue_delayed(started_at) ⇒ Object
- #queue_scheduled(started_at) ⇒ Object
- #running?(started_at) ⇒ Boolean
- #start_loop ⇒ Object
Constructor Details
#initialize(opts = {}) ⇒ Do
Returns a new instance of Do.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/when-do/do.rb', line 10 def initialize(opts={}) @opts = opts @logger = init_logger(opts[:log_path], opts[:log_level]) Process.daemon(true) if opts.has_key?(:daemonize) @pid_file_path = opts[:pid_file_path] if pid_file_path File.open(pid_file_path, 'w') { |f| f.write(Process.pid) } end redis_opts = opts[:redis_opts] || {} @redis = Redis.new(redis_opts) @schedule_key = opts[:schedule_key] || 'when:schedules' @worker_queue_key = opts[:worker_queue_key] || 'when:queue:default' @delayed_queue_key = opts[:delayed_queue_key] || 'when:delayed' end |
Instance Attribute Details
#delayed_queue_key ⇒ Object (readonly)
Returns the value of attribute delayed_queue_key.
8 9 10 |
# File 'lib/when-do/do.rb', line 8 def delayed_queue_key @delayed_queue_key end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
8 9 10 |
# File 'lib/when-do/do.rb', line 8 def logger @logger end |
#opts ⇒ Object (readonly)
Returns the value of attribute opts.
8 9 10 |
# File 'lib/when-do/do.rb', line 8 def opts @opts end |
#pid_file_path ⇒ Object (readonly)
Returns the value of attribute pid_file_path.
8 9 10 |
# File 'lib/when-do/do.rb', line 8 def pid_file_path @pid_file_path end |
#redis ⇒ Object (readonly)
Returns the value of attribute redis.
8 9 10 |
# File 'lib/when-do/do.rb', line 8 def redis @redis end |
#schedule_key ⇒ Object (readonly)
Returns the value of attribute schedule_key.
8 9 10 |
# File 'lib/when-do/do.rb', line 8 def schedule_key @schedule_key end |
#worker_queue_key ⇒ Object (readonly)
Returns the value of attribute worker_queue_key.
8 9 10 |
# File 'lib/when-do/do.rb', line 8 def worker_queue_key @worker_queue_key end |
Instance Method Details
#analyze(started_at) ⇒ Object
70 71 72 73 74 75 76 77 78 79 |
# File 'lib/when-do/do.rb', line 70 def analyze(started_at) if running?(started_at) logger.info('Another process is already analyzing.') else analyze_dst(started_at) if dst_forward?(started_at) logger.debug { "Analyzing #{started_at}." } queue_scheduled(started_at) queue_delayed(started_at) end end |
#analyze_dst(started_at) ⇒ Object
99 100 101 102 103 104 105 |
# File 'lib/when-do/do.rb', line 99 def analyze_dst(started_at) logger.info { "DST forward shift detected. Triggering analysis for #{started_at.hour - 1}:00 through #{started_at.hour - 1}:59"} skipped_time = Time.new(started_at.year, started_at.month, started_at.day, started_at.hour - 1, 0, 0, started_at.utc_offset - 3600) (0..59).each do |min| analyze(skipped_time + min * 60) end end |
#analyze_in_child_process ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/when-do/do.rb', line 55 def analyze_in_child_process if pid = fork Thread.new { pid, status = Process.wait2(pid) if status.exitstatus != 0 raise "Child (pid: #{pid} exited with non-zero status. Check logs." end }.abort_on_exception = true else ['HUP', 'INT', 'TERM', 'QUIT'].each { |sig| Signal.trap(sig) { }} analyze(Time.now) exit end end |
#build_day_key(started_at) ⇒ Object
107 108 109 |
# File 'lib/when-do/do.rb', line 107 def build_day_key(started_at) "#{schedule_key}:#{started_at.to_s.split(' ')[0]}" end |
#build_min_key(started_at) ⇒ Object
111 112 113 |
# File 'lib/when-do/do.rb', line 111 def build_min_key(started_at) "#{started_at.hour}:#{started_at.min}" end |
#dst_forward?(started_at) ⇒ Boolean
95 96 97 |
# File 'lib/when-do/do.rb', line 95 def dst_forward?(started_at) started_at.hour - (started_at - 60).hour == 2 end |
#enqueue(jobs) ⇒ Object
147 148 149 150 151 152 153 154 155 156 |
# File 'lib/when-do/do.rb', line 147 def enqueue(jobs) jobs.each do |job| logger.info("Queueing: #{job}") queue = job['queue'] || worker_queue_key success = redis.lpush(queue, job.to_json) unless success > 0 raise "Failed to queue #{job}. Redis returned #{success}." end end end |
#queue_delayed(started_at) ⇒ Object
136 137 138 139 140 141 142 143 144 145 |
# File 'lib/when-do/do.rb', line 136 def queue_delayed(started_at) logger.info("Checking for delayed jobs.") raw_delayed_jobs = redis.multi do redis.zrevrangebyscore(delayed_queue_key, started_at.to_i, '-inf') redis.zremrangebyscore(delayed_queue_key, '-inf', started_at.to_i) end[0] delayed_jobs = raw_delayed_jobs.map { |j| JSON.parse(j) } logger.debug { "Found #{delayed_jobs.count} delayed jobs." } enqueue(delayed_jobs) if delayed_jobs.any? end |
#queue_scheduled(started_at) ⇒ Object
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/when-do/do.rb', line 115 def queue_scheduled(started_at) schedules = redis.hvals(schedule_key) logger.info("Analyzing #{schedules.count} schedules.") scheduled_jobs = schedules.inject([]) do |jobs, s| schedule = JSON.parse(s) if (cron = When::Cron.valid(schedule['cron'])) if cron == started_at job = schedule.merge('jid' => SecureRandom.uuid) job.delete('cron') jobs << job end else logger.error { "Could not interpret cron for #{schedule.inspect}" } end jobs end logger.debug { "Found #{scheduled_jobs.count} schedules due to be queued." } enqueue(scheduled_jobs) if scheduled_jobs.any? end |
#running?(started_at) ⇒ Boolean
81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/when-do/do.rb', line 81 def running?(started_at) day_key = build_day_key(started_at) min_key = build_min_key(started_at) logger.debug { "Checking Redis using day_key: '#{day_key}' and min_key: '#{min_key}'"} check_and_set_analyzed = redis.multi do redis.hget(day_key, min_key) redis.hset(day_key, min_key, 't') redis.expire(day_key, 60 * 60 * 24) end check_and_set_analyzed[0] end |
#start_loop ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/when-do/do.rb', line 29 def start_loop logger.info("Starting...") logger.info { "Schedule key: '#{schedule_key}', worker queue key: '#{worker_queue_key}', delayed queue key: '#{delayed_queue_key}'" } logger.info { "PID file: #{pid_file_path}" } if pid_file_path Signal.trap('USR1') { @logger = init_logger(opts[:log_path], opts[:log_level]) } loop do sleep_until_next_minute logger.debug { "Using #{`ps -o rss -p #{Process.pid}`.chomp.split("\n").last.to_i} kb of memory." } analyze_in_child_process end rescue SystemExit => e raise e rescue SignalException => e logger.info(e.inspect) File.delete(pid_file_path) if pid_file_path raise e rescue Exception => e ([e.inspect] + e.backtrace).each { |line| logger.fatal(line) } raise e end |