Class: Hekenga::Migration
- Inherits:
-
Object
- Object
- Hekenga::Migration
- Defined in:
- lib/hekenga/migration.rb
Constant Summary collapse
- MIN_TOKEN_LENGTH =
Validations
5
Instance Attribute Summary collapse
-
#active_idx ⇒ Object
Returns the value of attribute active_idx.
-
#batch_size ⇒ Object
Returns the value of attribute batch_size.
-
#description ⇒ Object
Returns the value of attribute description.
-
#session ⇒ Object
readonly
Returns the value of attribute session.
-
#stamp ⇒ Object
Returns the value of attribute stamp.
-
#tasks ⇒ Object
readonly
Returns the value of attribute tasks.
-
#test_mode ⇒ Object
readonly
Returns the value of attribute test_mode.
Instance Method Summary collapse
- #create_log!(attrs = {}) ⇒ Object
- #desc_to_token ⇒ Object
- #execute_document_task(task_idx, executor_key, records) ⇒ Object
- #filter_out_processed(task, task_idx, records) ⇒ Object
-
#initialize ⇒ Migration
constructor
A new instance of Migration.
- #inspect ⇒ Object
- #log(task_idx = @active_idx) ⇒ Object
- #log_cancel! ⇒ Object
- #log_done! ⇒ Object
- #perform! ⇒ Object
- #perform_task!(task_idx) ⇒ Object
- #performed? ⇒ Boolean
- #performing? ⇒ Boolean
- #recover! ⇒ Object
-
#reload_logs ⇒ Object
API.
- #simple_failure!(error) ⇒ Object
- #start_document_task(task, task_idx, recover: false) ⇒ Object
- #start_parallel_task(task, task_idx) ⇒ Object
-
#start_simple_task(task) ⇒ Object
Internal perform methods.
- #task_records(task_idx) ⇒ Object
- #test_mode! ⇒ Object
-
#timestamp ⇒ Object
Internal.
- #to_key ⇒ Object
- #validate! ⇒ Object
- #validation_error(field, reason) ⇒ Object
Constructor Details
#initialize ⇒ Migration
Returns a new instance of Migration.
14 15 16 17 18 |
# File 'lib/hekenga/migration.rb', line 14 def initialize @tasks = [] @logs = {} @batch_size = 25 end |
Instance Attribute Details
#active_idx ⇒ Object
Returns the value of attribute active_idx.
11 12 13 |
# File 'lib/hekenga/migration.rb', line 11 def active_idx @active_idx end |
#batch_size ⇒ Object
Returns the value of attribute batch_size.
11 12 13 |
# File 'lib/hekenga/migration.rb', line 11 def batch_size @batch_size end |
#description ⇒ Object
Returns the value of attribute description.
11 12 13 |
# File 'lib/hekenga/migration.rb', line 11 def description @description end |
#session ⇒ Object (readonly)
Returns the value of attribute session.
12 13 14 |
# File 'lib/hekenga/migration.rb', line 12 def session @session end |
#stamp ⇒ Object
Returns the value of attribute stamp.
11 12 13 |
# File 'lib/hekenga/migration.rb', line 11 def stamp @stamp end |
#tasks ⇒ Object (readonly)
Returns the value of attribute tasks.
12 13 14 |
# File 'lib/hekenga/migration.rb', line 12 def tasks @tasks end |
#test_mode ⇒ Object (readonly)
Returns the value of attribute test_mode.
12 13 14 |
# File 'lib/hekenga/migration.rb', line 12 def test_mode @test_mode end |
Instance Method Details
#create_log!(attrs = {}) ⇒ Object
45 46 47 48 49 50 |
# File 'lib/hekenga/migration.rb', line 45 def create_log!(attrs = {}) @logs[@active_idx] = Hekenga::Log.create(attrs.merge( migration: self, task_idx: @active_idx )) end |
#desc_to_token ⇒ Object
25 26 27 |
# File 'lib/hekenga/migration.rb', line 25 def desc_to_token @desc_to_token ||= self.description.gsub(/[^A-Za-z]+/,"_").gsub(/(^_|_$)/,"") end |
#execute_document_task(task_idx, executor_key, records) ⇒ Object
162 163 164 165 166 167 168 169 170 171 |
# File 'lib/hekenga/migration.rb', line 162 def execute_document_task(task_idx, executor_key, records) task_record = Hekenga::DocumentTaskRecord.create( migration_key: to_key, task_idx: task_idx, executor_key: executor_key, test_mode: test_mode, ids: records.map(&:id) ) Hekenga::DocumentTaskExecutor.new(task_record, records: records).run! end |
#filter_out_processed(task, task_idx, records) ⇒ Object
152 153 154 155 156 157 158 159 160 |
# File 'lib/hekenga/migration.rb', line 152 def filter_out_processed(task, task_idx, records) return records if records.empty? selector = task_records(task_idx).in(ids: records.map(&:id)) processed_ids = selector.pluck(:ids).flatten.to_set records.reject do |record| processed_ids.include?(record._id) end end |
#inspect ⇒ Object
29 30 31 |
# File 'lib/hekenga/migration.rb', line 29 def inspect "<Hekenga::Migration #{self.to_key}>" end |
#log(task_idx = @active_idx) ⇒ Object
37 38 39 40 41 42 43 |
# File 'lib/hekenga/migration.rb', line 37 def log(task_idx = @active_idx) raise "Missing task index" if task_idx.nil? @logs[task_idx] ||= Hekenga::Log.find_by( pkey: self.to_key, task_idx: task_idx ) end |
#log_cancel! ⇒ Object
182 183 184 185 |
# File 'lib/hekenga/migration.rb', line 182 def log_cancel! # Bypass the active transaction if there is one log.set_without_session({cancel: true, error: true, done: true, finished: Time.now}) end |
#log_done! ⇒ Object
112 113 114 |
# File 'lib/hekenga/migration.rb', line 112 def log_done! log.set_without_session({done: true, finished: Time.now}) end |
#perform! ⇒ Object
69 70 71 72 73 74 75 76 |
# File 'lib/hekenga/migration.rb', line 69 def perform! if Hekenga.status(self) == :naught Hekenga::MasterProcess.new(self).run! else Hekenga.log "This migration has already been run! Aborting." return false end end |
#perform_task!(task_idx) ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/hekenga/migration.rb', line 78 def perform_task!(task_idx) task = @tasks[task_idx] or return @active_idx = task_idx case task when Hekenga::SimpleTask start_simple_task(task) when Hekenga::DocumentTask if task.parallel start_parallel_task(task, task_idx) else start_document_task(task, task_idx) end end end |
#performed? ⇒ Boolean
61 62 63 |
# File 'lib/hekenga/migration.rb', line 61 def performed? !!log(self.tasks.length - 1).done end |
#performing? ⇒ Boolean
57 58 59 |
# File 'lib/hekenga/migration.rb', line 57 def performing? Hekenga::Log.where(pkey: self.to_key, done: false).any? end |
#recover! ⇒ Object
93 94 95 |
# File 'lib/hekenga/migration.rb', line 93 def recover! Hekenga::MasterProcess.new(self).recover! end |
#reload_logs ⇒ Object
API
53 54 55 |
# File 'lib/hekenga/migration.rb', line 53 def reload_logs @logs = {} end |
#simple_failure!(error) ⇒ Object
173 174 175 176 177 178 179 180 |
# File 'lib/hekenga/migration.rb', line 173 def simple_failure!(error) log.add_failure({ message: error.to_s, backtrace: error.backtrace, simple: true }, Hekenga::Failure::Error) log_cancel! end |
#start_document_task(task, task_idx, recover: false) ⇒ Object
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/hekenga/migration.rb', line 130 def start_document_task(task, task_idx, recover: false) create_log! records = [] task_records(task_idx).delete_all unless recover executor_key = BSON::ObjectId.new task.scope.asc(:_id).no_timeout.each do |record| records.push(record) next unless records.length == (task.batch_size || batch_size) records = filter_out_processed(task, task_idx, records) if recover next unless records.length == (task.batch_size || batch_size) execute_document_task(task_idx, executor_key, records) records = [] return if log.cancel end records = filter_out_processed(task, task_idx, records) if recover execute_document_task(task_idx, executor_key, records) if records.any? return if log.cancel log_done! end |
#start_parallel_task(task, task_idx) ⇒ Object
116 117 118 119 120 121 122 123 124 |
# File 'lib/hekenga/migration.rb', line 116 def start_parallel_task(task, task_idx) create_log! Hekenga::ParallelTask.new( migration: self, task: task, task_idx: task_idx, test_mode: test_mode ).start! end |
#start_simple_task(task) ⇒ Object
Internal perform methods
98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/hekenga/migration.rb', line 98 def start_simple_task(task) create_log! begin @context = Hekenga::Context.new(test_mode: test_mode) task.up!(@context) rescue => e simple_failure!(e) return ensure @context = nil end log_done! end |
#task_records(task_idx) ⇒ Object
126 127 128 |
# File 'lib/hekenga/migration.rb', line 126 def task_records(task_idx) Hekenga::DocumentTaskRecord.where(migration_key: to_key, task_idx: task_idx) end |
#test_mode! ⇒ Object
65 66 67 |
# File 'lib/hekenga/migration.rb', line 65 def test_mode! @test_mode = true end |
#timestamp ⇒ Object
Internal
21 22 23 |
# File 'lib/hekenga/migration.rb', line 21 def self.stamp.strftime("%Y-%m-%dT%H:%M") end |
#to_key ⇒ Object
33 34 35 |
# File 'lib/hekenga/migration.rb', line 33 def to_key @pkey ||= "#{}-#{desc_to_token}" end |
#validate! ⇒ Object
194 195 196 197 198 199 200 |
# File 'lib/hekenga/migration.rb', line 194 def validate! validation_error(:stamp, "missing") unless self.stamp.is_a?(Time) validation_error(:description, "missing") unless self.description validation_error(:description, "too short") unless self.desc_to_token.length > 5 validation_error(:tasks, "missing") if self.tasks.length.zero? true end |