Class: BatchKit::Database
- Inherits:
-
Object
- Object
- BatchKit::Database
- Defined in:
- lib/batch-kit/database.rb,
lib/batch-kit/database/models.rb,
lib/batch-kit/database/schema.rb,
lib/batch-kit/database/log4r_outputter.rb,
lib/batch-kit/database/java_util_log_handler.rb
Overview
Implements functionality for persisting details of jobs run in a relational database, via the Sequel database library.
Defined Under Namespace
Classes: JavaUtilLogHandler, Job, JobRun, JobRunArg, JobRunFailure, JobRunLog, Lock, Log4ROutputter, MD5, Request, Requestor, Schema, Task, TaskRun
Instance Method Summary collapse
-
#connect(*args) ⇒ Object
Connect to a back-end database for persistence.
-
#initialize(options = {}) ⇒ Database
constructor
Instantiate a database back-end for persisting job and task runs.
-
#log ⇒ Object
Log database messages under the batch.database namespace.
-
#perform_housekeeping ⇒ Object
Purges detail records that are older than the retention threshhold.
Constructor Details
Instance Method Details
#connect(*args) ⇒ Object
Connect to a back-end database for persistence.
32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/batch-kit/database.rb', line 32 def connect(*args) @schema.connect(*args) # We can only include the models once we have connected require_relative 'database/models' # Check if the database schema is up-to-date MD5.check_schema(@schema) # Perform housekeeping tasks perform_housekeeping end |
#log ⇒ Object
Log database messages under the batch.database namespace.
23 24 25 |
# File 'lib/batch-kit/database.rb', line 23 def log @log ||= BatchKit::LogManager.logger('batch.database') end |
#perform_housekeeping ⇒ Object
Purges detail records that are older than the retention threshhold.
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/batch-kit/database.rb', line 47 def perform_housekeeping # Only do housekeeping once per day return if JobRun.where{job_start_time > Date.today}.count > 0 log.info "Performing batch database housekeeping" # Abort jobs in Executing state that have not logged for 6+ hours @schema.connection.transaction do cutoff = Time.now - 6 * 60 * 60 exec_jobs = JobRun.where(job_status: 'EXECUTING').map(:job_run) curr_jobs = JobRunLog.select_group(:job_run). where(job_run: exec_jobs).having{max(log_time) > cutoff}.map(:job_run) abort_jobs = JobRun.where(job_run: exec_jobs - curr_jobs).all if abort_jobs.count > 0 log.detail "Cleaning up #{abort_jobs.count} zombie jobs" abort_tasks = TaskRun.where(job_run: abort_jobs.map(&:id), task_status: 'EXECUTING') abort_tasks.each(&:timeout) abort_jobs.each(&:timeout) end end # Purge locks that expired 6+ hours ago @schema.connection.transaction do purge_date = Time.now - 6 * 60 * 60 Lock.where{lock_expires_at < purge_date}.delete end # Purge log records for old job runs @schema.connection.transaction do purge_date = Date.today - @options.fetch(:log_retention_days, 60) purge_job_runs = JobRun.where(job_purged_flag: false). where{job_start_time < purge_date}.map(:job_run) if purge_job_runs.count > 0 log.detail "Purging log records for #{purge_job_runs.count} job runs" purge_job_runs.each_slice(1000).each do |purge_ids| JobRunLog.where(job_run: purge_ids).delete JobRun.where(job_run: purge_ids).update(job_purged_flag: true) end end end # Purge old task and job runs @schema.connection.transaction do purge_date = Date.today - @options.fetch(:job_run_retention_days, 365) purge_job_runs = JobRun.where{job_start_time < purge_date}.map(:job_run) if purge_job_runs.count > 0 log.detail "Purging job and task run records for #{purge_job_runs.count} job runs" purge_job_runs.each_slice(1000).each do |purge_ids| JobRunArg.where(job_run: purge_ids).delete TaskRun.where(job_run: purge_ids).delete JobRun.where(job_run: purge_ids).delete end end end # Purge old request runs @schema.connection.transaction do purge_date = Date.today - @options.fetch(:request_retention_days, 90) purge_requests = Request.where{request_launched_at < purge_date}.map(:request_id) if purge_requests.count > 0 log.detail "Purging request records for #{purge_requests.count} requests" purge_requests.each_slice(1000).each do |purge_ids| Request.where(request_id: purge_ids).delete Requestor.where(request_id: purge_ids).delete end end end # Purge jobs with no runs @schema.connection.transaction do purge_jobs = Job.left_join(:batch_job_run, :job_id => :job_id). where(Sequel.qualify(:batch_job_run, :job_id) => nil). select(Sequel.qualify(:batch_job, :job_id)).map(:job_id) if purge_jobs.count > 0 log.detail "Purging #{purge_jobs.count} old jobs" purge_jobs.each_slice(1000).each do |purge_ids| JobRunFailure.where(job_id: purge_ids).delete Task.where(job_id: purge_ids).delete Job.where(job_id: purge_ids).delete end end end end |