Class: Hekenga::Migration

Inherits:
Object
  • Object
show all
Defined in:
lib/hekenga/migration.rb

Constant Summary collapse

MIN_TOKEN_LENGTH =

Validations

5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeMigration

Returns a new instance of Migration.



14
15
16
17
18
# File 'lib/hekenga/migration.rb', line 14

def initialize
  @tasks      = []
  @logs       = {}
  @batch_size = 25
end

Instance Attribute Details

#active_idxObject

Returns the value of attribute active_idx.



11
12
13
# File 'lib/hekenga/migration.rb', line 11

def active_idx
  @active_idx
end

#batch_sizeObject

Returns the value of attribute batch_size.



11
12
13
# File 'lib/hekenga/migration.rb', line 11

def batch_size
  @batch_size
end

#descriptionObject

Returns the value of attribute description.



11
12
13
# File 'lib/hekenga/migration.rb', line 11

def description
  @description
end

#sessionObject (readonly)

Returns the value of attribute session.



12
13
14
# File 'lib/hekenga/migration.rb', line 12

def session
  @session
end

#stampObject

Returns the value of attribute stamp.



11
12
13
# File 'lib/hekenga/migration.rb', line 11

def stamp
  @stamp
end

#tasksObject (readonly)

Returns the value of attribute tasks.



12
13
14
# File 'lib/hekenga/migration.rb', line 12

def tasks
  @tasks
end

#test_modeObject (readonly)

Returns the value of attribute test_mode.



12
13
14
# File 'lib/hekenga/migration.rb', line 12

def test_mode
  @test_mode
end

Instance Method Details

#create_log!(attrs = {}) ⇒ Object



45
46
47
48
49
50
# File 'lib/hekenga/migration.rb', line 45

def create_log!(attrs = {})
  @logs[@active_idx] = Hekenga::Log.create(attrs.merge(
    migration: self,
    task_idx:  @active_idx
  ))
end

#desc_to_tokenObject



25
26
27
# File 'lib/hekenga/migration.rb', line 25

def desc_to_token
  @desc_to_token ||= self.description.gsub(/[^A-Za-z]+/,"_").gsub(/(^_|_$)/,"")
end

#execute_document_task(task_idx, executor_key, records) ⇒ Object



162
163
164
165
166
167
168
169
170
171
# File 'lib/hekenga/migration.rb', line 162

def execute_document_task(task_idx, executor_key, records)
  task_record = Hekenga::DocumentTaskRecord.create(
    migration_key: to_key,
    task_idx: task_idx,
    executor_key: executor_key,
    test_mode: test_mode,
    ids: records.map(&:id)
  )
  Hekenga::DocumentTaskExecutor.new(task_record, records: records).run!
end

#filter_out_processed(task, task_idx, records) ⇒ Object



152
153
154
155
156
157
158
159
160
# File 'lib/hekenga/migration.rb', line 152

def filter_out_processed(task, task_idx, records)
  return records if records.empty?

  selector = task_records(task_idx).in(ids: records.map(&:id))
  processed_ids = selector.pluck(:ids).flatten.to_set
  records.reject do |record|
    processed_ids.include?(record._id)
  end
end

#inspectObject



29
30
31
# File 'lib/hekenga/migration.rb', line 29

def inspect
  "<Hekenga::Migration #{self.to_key}>"
end

#log(task_idx = @active_idx) ⇒ Object



37
38
39
40
41
42
43
# File 'lib/hekenga/migration.rb', line 37

def log(task_idx = @active_idx)
  raise "Missing task index" if task_idx.nil?
  @logs[task_idx] ||= Hekenga::Log.find_by(
    pkey: self.to_key,
    task_idx: task_idx
  )
end

#log_cancel!Object



182
183
184
185
# File 'lib/hekenga/migration.rb', line 182

def log_cancel!
  # Bypass the active transaction if there is one
  log.set_without_session({cancel: true, error: true, done: true, finished: Time.now})
end

#log_done!Object



112
113
114
# File 'lib/hekenga/migration.rb', line 112

def log_done!
  log.set_without_session({done: true, finished: Time.now})
end

#perform!Object



69
70
71
72
73
74
75
76
# File 'lib/hekenga/migration.rb', line 69

def perform!
  if Hekenga.status(self) == :naught
    Hekenga::MasterProcess.new(self).run!
  else
    Hekenga.log "This migration has already been run! Aborting."
    return false
  end
end

#perform_task!(task_idx) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/hekenga/migration.rb', line 78

def perform_task!(task_idx)
  task         = @tasks[task_idx] or return
  @active_idx  = task_idx
  case task
  when Hekenga::SimpleTask
    start_simple_task(task)
  when Hekenga::DocumentTask
    if task.parallel
      start_parallel_task(task, task_idx)
    else
      start_document_task(task, task_idx)
    end
  end
end

#performed?Boolean

Returns:

  • (Boolean)


61
62
63
# File 'lib/hekenga/migration.rb', line 61

def performed?
  !!log(self.tasks.length - 1).done
end

#performing?Boolean

Returns:

  • (Boolean)


57
58
59
# File 'lib/hekenga/migration.rb', line 57

def performing?
  Hekenga::Log.where(pkey: self.to_key, done: false).any?
end

#recover!Object



93
94
95
# File 'lib/hekenga/migration.rb', line 93

def recover!
  Hekenga::MasterProcess.new(self).recover!
end

#reload_logsObject

API



53
54
55
# File 'lib/hekenga/migration.rb', line 53

def reload_logs
  @logs = {}
end

#simple_failure!(error) ⇒ Object



173
174
175
176
177
178
179
180
# File 'lib/hekenga/migration.rb', line 173

def simple_failure!(error)
  log.add_failure({
    message:   error.to_s,
    backtrace: error.backtrace,
    simple:    true
  }, Hekenga::Failure::Error)
  log_cancel!
end

#start_document_task(task, task_idx, recover: false) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/hekenga/migration.rb', line 130

def start_document_task(task, task_idx, recover: false)
  create_log!
  records = []
  task_records(task_idx).delete_all unless recover
  executor_key = BSON::ObjectId.new
  task.scope.asc(:_id).no_timeout.each do |record|
    records.push(record)
    next unless records.length == (task.batch_size || batch_size)

    records = filter_out_processed(task, task_idx, records) if recover
    next unless records.length == (task.batch_size || batch_size)

    execute_document_task(task_idx, executor_key, records)
    records = []
    return if log.cancel
  end
  records = filter_out_processed(task, task_idx, records) if recover
  execute_document_task(task_idx, executor_key, records) if records.any?
  return if log.cancel
  log_done!
end

#start_parallel_task(task, task_idx) ⇒ Object



116
117
118
119
120
121
122
123
124
# File 'lib/hekenga/migration.rb', line 116

def start_parallel_task(task, task_idx)
  create_log!
  Hekenga::ParallelTask.new(
    migration: self,
    task:      task,
    task_idx:  task_idx,
    test_mode: test_mode
  ).start!
end

#start_simple_task(task) ⇒ Object

Internal perform methods



98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/hekenga/migration.rb', line 98

def start_simple_task(task)
  create_log!
  begin
    @context = Hekenga::Context.new(test_mode: test_mode)
    task.up!(@context)
  rescue => e
    simple_failure!(e)
    return
  ensure
    @context = nil
  end
  log_done!
end

#task_records(task_idx) ⇒ Object



126
127
128
# File 'lib/hekenga/migration.rb', line 126

def task_records(task_idx)
  Hekenga::DocumentTaskRecord.where(migration_key: to_key, task_idx: task_idx)
end

#test_mode!Object



65
66
67
# File 'lib/hekenga/migration.rb', line 65

def test_mode!
  @test_mode = true
end

#timestampObject

Internal



21
22
23
# File 'lib/hekenga/migration.rb', line 21

def timestamp
  self.stamp.strftime("%Y-%m-%dT%H:%M")
end

#to_keyObject



33
34
35
# File 'lib/hekenga/migration.rb', line 33

def to_key
  @pkey ||= "#{timestamp}-#{desc_to_token}"
end

#validate!Object



194
195
196
197
198
199
200
# File 'lib/hekenga/migration.rb', line 194

def validate!
  validation_error(:stamp,       "missing")   unless self.stamp.is_a?(Time)
  validation_error(:description, "missing")   unless self.description
  validation_error(:description, "too short") unless self.desc_to_token.length > 5
  validation_error(:tasks,       "missing")   if self.tasks.length.zero?
  true
end

#validation_error(field, reason) ⇒ Object

Raises:



190
191
192
# File 'lib/hekenga/migration.rb', line 190

def validation_error(field, reason)
  raise Hekenga::Invalid.new(self, field, reason)
end