Class: BatchKit::Database

Inherits:
Object
  • Object
show all
Defined in:
lib/batch-kit/database.rb,
lib/batch-kit/database/models.rb,
lib/batch-kit/database/schema.rb,
lib/batch-kit/database/log4r_outputter.rb,
lib/batch-kit/database/java_util_log_handler.rb

Overview

Implements functionality for persisting details of jobs run in a relational database, via the Sequel database library.

Defined Under Namespace

Classes: JavaUtilLogHandler, Job, JobRun, JobRunArg, JobRunFailure, JobRunLog, Lock, Log4ROutputter, MD5, Request, Requestor, Schema, Task, TaskRun

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Database

Instantiate a database back-end for persisting job and task runs.

Parameters:

  • options (Hash) (defaults to: {})

    An options hash, passed on to the Schema instance.



16
17
18
19
# File 'lib/batch-kit/database.rb', line 16

def initialize(options = {})
    @options = options
    @schema = Schema.new(options)
end

Instance Method Details

#connect(*args) ⇒ Object

Connect to a back-end database for persistence.

Parameters:



32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/batch-kit/database.rb', line 32

def connect(*args)
    @schema.connect(*args)

    # We can only include the models once we have connected
    require_relative 'database/models'

    # Check if the database schema is up-to-date
    MD5.check_schema(@schema)

    # Perform housekeeping tasks
    perform_housekeeping
end

#logObject

Log database messages under the batch.database namespace.



23
24
25
# File 'lib/batch-kit/database.rb', line 23

def log
    @log ||= BatchKit::LogManager.logger('batch.database')
end

#perform_housekeepingObject

Purges detail records that are older than the retention threshhold.



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/batch-kit/database.rb', line 47

def perform_housekeeping
    # Only do housekeeping once per day
    return if JobRun.where{job_start_time > Date.today}.count > 0

    log.info "Performing batch database housekeeping"

    # Abort jobs in Executing state that have not logged for 6+ hours
    @schema.connection.transaction do
        cutoff = Time.now - 6 * 60 * 60
        exec_jobs = JobRun.where(job_status: 'EXECUTING').map(:job_run)
        curr_jobs = JobRunLog.select_group(:job_run).
            where(job_run: exec_jobs).having{max(log_time) > cutoff}.map(:job_run)
        abort_jobs = JobRun.where(job_run: exec_jobs - curr_jobs).all
        if abort_jobs.count > 0
            log.detail "Cleaning up #{abort_jobs.count} zombie jobs"
            abort_tasks = TaskRun.where(job_run: abort_jobs.map(&:id), task_status: 'EXECUTING')
            abort_tasks.each(&:timeout)
            abort_jobs.each(&:timeout)
        end
    end

    # Purge locks that expired 6+ hours ago
    @schema.connection.transaction do
        purge_date = Time.now - 6 * 60 * 60
        Lock.where{lock_expires_at < purge_date}.delete
    end

    # Purge log records for old job runs
    @schema.connection.transaction do
        purge_date = Date.today - @options.fetch(:log_retention_days, 60)
        purge_job_runs = JobRun.where(job_purged_flag: false).
            where{job_start_time < purge_date}.map(:job_run)
        if purge_job_runs.count > 0
            log.detail "Purging log records for #{purge_job_runs.count} job runs"
            purge_job_runs.each_slice(1000).each do |purge_ids|
                JobRunLog.where(job_run: purge_ids).delete
                JobRun.where(job_run: purge_ids).update(job_purged_flag: true)
            end
        end
    end

    # Purge old task and job runs
    @schema.connection.transaction do
        purge_date = Date.today - @options.fetch(:job_run_retention_days, 365)
        purge_job_runs = JobRun.where{job_start_time < purge_date}.map(:job_run)
        if purge_job_runs.count > 0
            log.detail "Purging job and task run records for #{purge_job_runs.count} job runs"
            purge_job_runs.each_slice(1000).each do |purge_ids|
                JobRunArg.where(job_run: purge_ids).delete
                TaskRun.where(job_run: purge_ids).delete
                JobRun.where(job_run: purge_ids).delete
            end
        end
    end

    # Purge old request runs
    @schema.connection.transaction do
        purge_date = Date.today - @options.fetch(:request_retention_days, 90)
        purge_requests = Request.where{request_launched_at < purge_date}.map(:request_id)
        if purge_requests.count > 0
            log.detail "Purging request records for #{purge_requests.count} requests"
            purge_requests.each_slice(1000).each do |purge_ids|
                Request.where(request_id: purge_ids).delete
                Requestor.where(request_id: purge_ids).delete
            end
        end
    end

    # Purge jobs with no runs
    @schema.connection.transaction do
        purge_jobs = Job.left_join(:batch_job_run, :job_id => :job_id).
            where(Sequel.qualify(:batch_job_run, :job_id) => nil).
            select(Sequel.qualify(:batch_job, :job_id)).map(:job_id)
        if purge_jobs.count > 0
            log.detail "Purging #{purge_jobs.count} old jobs"
            purge_jobs.each_slice(1000).each do |purge_ids|
                JobRunFailure.where(job_id: purge_ids).delete
                Task.where(job_id: purge_ids).delete
                Job.where(job_id: purge_ids).delete
            end
        end
    end
end