Class: Hekenga::Migration
- Inherits:
-
Object
- Object
- Hekenga::Migration
- Defined in:
- lib/hekenga/migration.rb
Constant Summary collapse
- MIN_TOKEN_LENGTH =
Validations
5
Instance Attribute Summary collapse
-
#active_idx ⇒ Object
Returns the value of attribute active_idx.
-
#batch_size ⇒ Object
Returns the value of attribute batch_size.
-
#description ⇒ Object
Returns the value of attribute description.
-
#session ⇒ Object
readonly
Returns the value of attribute session.
-
#stamp ⇒ Object
Returns the value of attribute stamp.
-
#tasks ⇒ Object
readonly
Returns the value of attribute tasks.
-
#test_mode ⇒ Object
readonly
Returns the value of attribute test_mode.
Instance Method Summary collapse
- #create_log!(attrs = {}) ⇒ Object
- #desc_to_token ⇒ Object
- #execute_document_task(task_idx, executor_key, records) ⇒ Object
- #filter_out_processed(task, task_idx, records) ⇒ Object
-
#initialize ⇒ Migration
constructor
A new instance of Migration.
- #inspect ⇒ Object
- #log(task_idx = @active_idx) ⇒ Object
- #log_cancel! ⇒ Object
- #log_done! ⇒ Object
- #perform! ⇒ Object
- #perform_task!(task_idx) ⇒ Object
- #performed? ⇒ Boolean
- #performing? ⇒ Boolean
- #recover! ⇒ Object
-
#reload_logs ⇒ Object
API.
- #simple_failure!(error) ⇒ Object
- #start_document_task(task, task_idx, recover: false) ⇒ Object
- #start_parallel_task(task, task_idx) ⇒ Object
-
#start_simple_task(task) ⇒ Object
Internal perform methods.
- #task_records(task_idx) ⇒ Object
- #test_mode! ⇒ Object
-
#timestamp ⇒ Object
Internal.
- #to_key ⇒ Object
- #validate! ⇒ Object
- #validation_error(field, reason) ⇒ Object
Constructor Details
#initialize ⇒ Migration
Returns a new instance of Migration.
15 16 17 18 19 |
# File 'lib/hekenga/migration.rb', line 15 def initialize @tasks = [] @logs = {} @batch_size = 25 end |
Instance Attribute Details
#active_idx ⇒ Object
Returns the value of attribute active_idx.
12 13 14 |
# File 'lib/hekenga/migration.rb', line 12 def active_idx @active_idx end |
#batch_size ⇒ Object
Returns the value of attribute batch_size.
12 13 14 |
# File 'lib/hekenga/migration.rb', line 12 def batch_size @batch_size end |
#description ⇒ Object
Returns the value of attribute description.
12 13 14 |
# File 'lib/hekenga/migration.rb', line 12 def description @description end |
#session ⇒ Object (readonly)
Returns the value of attribute session.
13 14 15 |
# File 'lib/hekenga/migration.rb', line 13 def session @session end |
#stamp ⇒ Object
Returns the value of attribute stamp.
12 13 14 |
# File 'lib/hekenga/migration.rb', line 12 def stamp @stamp end |
#tasks ⇒ Object (readonly)
Returns the value of attribute tasks.
13 14 15 |
# File 'lib/hekenga/migration.rb', line 13 def tasks @tasks end |
#test_mode ⇒ Object (readonly)
Returns the value of attribute test_mode.
13 14 15 |
# File 'lib/hekenga/migration.rb', line 13 def test_mode @test_mode end |
Instance Method Details
#create_log!(attrs = {}) ⇒ Object
46 47 48 49 50 51 |
# File 'lib/hekenga/migration.rb', line 46 def create_log!(attrs = {}) @logs[@active_idx] = Hekenga::Log.create(attrs.merge( migration: self, task_idx: @active_idx )) end |
#desc_to_token ⇒ Object
26 27 28 |
# File 'lib/hekenga/migration.rb', line 26 def desc_to_token @desc_to_token ||= self.description.gsub(/[^A-Za-z]+/,"_").gsub(/(^_|_$)/,"") end |
#execute_document_task(task_idx, executor_key, records) ⇒ Object
163 164 165 166 167 168 169 170 171 172 |
# File 'lib/hekenga/migration.rb', line 163 def execute_document_task(task_idx, executor_key, records) task_record = Hekenga::DocumentTaskRecord.create( migration_key: to_key, task_idx: task_idx, executor_key: executor_key, test_mode: test_mode, ids: records.map(&:id) ) Hekenga::DocumentTaskExecutor.new(task_record, records: records).run! end |
#filter_out_processed(task, task_idx, records) ⇒ Object
153 154 155 156 157 158 159 160 161 |
# File 'lib/hekenga/migration.rb', line 153 def filter_out_processed(task, task_idx, records) return records if records.empty? selector = task_records(task_idx).in(ids: records.map(&:id)) processed_ids = selector.pluck(:ids).flatten.to_set records.reject do |record| processed_ids.include?(record._id) end end |
#inspect ⇒ Object
30 31 32 |
# File 'lib/hekenga/migration.rb', line 30 def inspect "<Hekenga::Migration #{self.to_key}>" end |
#log(task_idx = @active_idx) ⇒ Object
38 39 40 41 42 43 44 |
# File 'lib/hekenga/migration.rb', line 38 def log(task_idx = @active_idx) raise "Missing task index" if task_idx.nil? @logs[task_idx] ||= Hekenga::Log.find_by( pkey: self.to_key, task_idx: task_idx ) end |
#log_cancel! ⇒ Object
183 184 185 186 |
# File 'lib/hekenga/migration.rb', line 183 def log_cancel! # Bypass the active transaction if there is one log.set_without_session({cancel: true, error: true, done: true, finished: Time.now}) end |
#log_done! ⇒ Object
113 114 115 |
# File 'lib/hekenga/migration.rb', line 113 def log_done! log.set_without_session({done: true, finished: Time.now}) end |
#perform! ⇒ Object
70 71 72 73 74 75 76 77 |
# File 'lib/hekenga/migration.rb', line 70 def perform! if Hekenga.status(self) == :naught Hekenga::MasterProcess.new(self).run! else Hekenga.log "This migration has already been run! Aborting." return false end end |
#perform_task!(task_idx) ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/hekenga/migration.rb', line 79 def perform_task!(task_idx) task = @tasks[task_idx] or return @active_idx = task_idx case task when Hekenga::SimpleTask start_simple_task(task) when Hekenga::DocumentTask if task.parallel start_parallel_task(task, task_idx) else start_document_task(task, task_idx) end end end |
#performed? ⇒ Boolean
62 63 64 |
# File 'lib/hekenga/migration.rb', line 62 def performed? !!log(self.tasks.length - 1).done end |
#performing? ⇒ Boolean
58 59 60 |
# File 'lib/hekenga/migration.rb', line 58 def performing? Hekenga::Log.where(pkey: self.to_key, done: false).any? end |
#recover! ⇒ Object
94 95 96 |
# File 'lib/hekenga/migration.rb', line 94 def recover! Hekenga::MasterProcess.new(self).recover! end |
#reload_logs ⇒ Object
API
54 55 56 |
# File 'lib/hekenga/migration.rb', line 54 def reload_logs @logs = {} end |
#simple_failure!(error) ⇒ Object
174 175 176 177 178 179 180 181 |
# File 'lib/hekenga/migration.rb', line 174 def simple_failure!(error) log.add_failure({ message: error.to_s, backtrace: error.backtrace, simple: true }, Hekenga::Failure::Error) log_cancel! end |
#start_document_task(task, task_idx, recover: false) ⇒ Object
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/hekenga/migration.rb', line 131 def start_document_task(task, task_idx, recover: false) create_log! records = [] task_records(task_idx).delete_all unless recover executor_key = BSON::ObjectId.new Hekenga::MongoidIterator.new(scope: task.scope, cursor_timeout: task.cursor_timeout).each do |record| records.push(record) next unless records.length == (task.batch_size || batch_size) records = filter_out_processed(task, task_idx, records) next unless records.length == (task.batch_size || batch_size) execute_document_task(task_idx, executor_key, records) records = [] return if log.cancel end records = filter_out_processed(task, task_idx, records) execute_document_task(task_idx, executor_key, records) if records.any? return if log.cancel log_done! end |
#start_parallel_task(task, task_idx) ⇒ Object
117 118 119 120 121 122 123 124 125 |
# File 'lib/hekenga/migration.rb', line 117 def start_parallel_task(task, task_idx) create_log! Hekenga::ParallelTask.new( migration: self, task: task, task_idx: task_idx, test_mode: test_mode ).start! end |
#start_simple_task(task) ⇒ Object
Internal perform methods
99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/hekenga/migration.rb', line 99 def start_simple_task(task) create_log! begin @context = Hekenga::Context.new(test_mode: test_mode) task.up!(@context) rescue => e simple_failure!(e) return ensure @context = nil end log_done! end |
#task_records(task_idx) ⇒ Object
127 128 129 |
# File 'lib/hekenga/migration.rb', line 127 def task_records(task_idx) Hekenga::DocumentTaskRecord.where(migration_key: to_key, task_idx: task_idx) end |
#test_mode! ⇒ Object
66 67 68 |
# File 'lib/hekenga/migration.rb', line 66 def test_mode! @test_mode = true end |
#timestamp ⇒ Object
Internal
22 23 24 |
# File 'lib/hekenga/migration.rb', line 22 def self.stamp.strftime("%Y-%m-%dT%H:%M") end |
#to_key ⇒ Object
34 35 36 |
# File 'lib/hekenga/migration.rb', line 34 def to_key @pkey ||= "#{}-#{desc_to_token}" end |
#validate! ⇒ Object
195 196 197 198 199 200 201 |
# File 'lib/hekenga/migration.rb', line 195 def validate! validation_error(:stamp, "missing") unless self.stamp.is_a?(Time) validation_error(:description, "missing") unless self.description validation_error(:description, "too short") unless self.desc_to_token.length > 5 validation_error(:tasks, "missing") if self.tasks.length.zero? true end |