Class: Hekenga::Migration

Inherits:
Object
  • Object
show all
Defined in:
lib/hekenga/migration.rb

Constant Summary collapse

MIN_TOKEN_LENGTH =

Validations

5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeMigration

Returns a new instance of Migration.



15
16
17
18
19
# File 'lib/hekenga/migration.rb', line 15

def initialize
  @tasks      = []
  @logs       = {}
  @batch_size = 25
end

Instance Attribute Details

#active_idxObject

Returns the value of attribute active_idx.



12
13
14
# File 'lib/hekenga/migration.rb', line 12

def active_idx
  @active_idx
end

#batch_sizeObject

Returns the value of attribute batch_size.



12
13
14
# File 'lib/hekenga/migration.rb', line 12

def batch_size
  @batch_size
end

#descriptionObject

Returns the value of attribute description.



12
13
14
# File 'lib/hekenga/migration.rb', line 12

def description
  @description
end

#sessionObject (readonly)

Returns the value of attribute session.



13
14
15
# File 'lib/hekenga/migration.rb', line 13

def session
  @session
end

#stampObject

Returns the value of attribute stamp.



12
13
14
# File 'lib/hekenga/migration.rb', line 12

def stamp
  @stamp
end

#tasksObject (readonly)

Returns the value of attribute tasks.



13
14
15
# File 'lib/hekenga/migration.rb', line 13

def tasks
  @tasks
end

#test_modeObject (readonly)

Returns the value of attribute test_mode.



13
14
15
# File 'lib/hekenga/migration.rb', line 13

def test_mode
  @test_mode
end

Instance Method Details

#create_log!(attrs = {}) ⇒ Object



46
47
48
49
50
51
# File 'lib/hekenga/migration.rb', line 46

def create_log!(attrs = {})
  @logs[@active_idx] = Hekenga::Log.create(attrs.merge(
    migration: self,
    task_idx:  @active_idx
  ))
end

#desc_to_tokenObject



26
27
28
# File 'lib/hekenga/migration.rb', line 26

def desc_to_token
  @desc_to_token ||= self.description.gsub(/[^A-Za-z]+/,"_").gsub(/(^_|_$)/,"")
end

#execute_document_task(task_idx, executor_key, records) ⇒ Object



163
164
165
166
167
168
169
170
171
172
# File 'lib/hekenga/migration.rb', line 163

def execute_document_task(task_idx, executor_key, records)
  task_record = Hekenga::DocumentTaskRecord.create(
    migration_key: to_key,
    task_idx: task_idx,
    executor_key: executor_key,
    test_mode: test_mode,
    ids: records.map(&:id)
  )
  Hekenga::DocumentTaskExecutor.new(task_record, records: records).run!
end

#filter_out_processed(task, task_idx, records) ⇒ Object



153
154
155
156
157
158
159
160
161
# File 'lib/hekenga/migration.rb', line 153

def filter_out_processed(task, task_idx, records)
  return records if records.empty?

  selector = task_records(task_idx).in(ids: records.map(&:id))
  processed_ids = selector.pluck(:ids).flatten.to_set
  records.reject do |record|
    processed_ids.include?(record._id)
  end
end

#inspectObject



30
31
32
# File 'lib/hekenga/migration.rb', line 30

def inspect
  "<Hekenga::Migration #{self.to_key}>"
end

#log(task_idx = @active_idx) ⇒ Object



38
39
40
41
42
43
44
# File 'lib/hekenga/migration.rb', line 38

def log(task_idx = @active_idx)
  raise "Missing task index" if task_idx.nil?
  @logs[task_idx] ||= Hekenga::Log.find_by(
    pkey: self.to_key,
    task_idx: task_idx
  )
end

#log_cancel!Object



183
184
185
186
# File 'lib/hekenga/migration.rb', line 183

def log_cancel!
  # Bypass the active transaction if there is one
  log.set_without_session({cancel: true, error: true, done: true, finished: Time.now})
end

#log_done!Object



113
114
115
# File 'lib/hekenga/migration.rb', line 113

def log_done!
  log.set_without_session({done: true, finished: Time.now})
end

#perform!Object



70
71
72
73
74
75
76
77
# File 'lib/hekenga/migration.rb', line 70

def perform!
  if Hekenga.status(self) == :naught
    Hekenga::MasterProcess.new(self).run!
  else
    Hekenga.log "This migration has already been run! Aborting."
    return false
  end
end

#perform_task!(task_idx) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/hekenga/migration.rb', line 79

def perform_task!(task_idx)
  task         = @tasks[task_idx] or return
  @active_idx  = task_idx
  case task
  when Hekenga::SimpleTask
    start_simple_task(task)
  when Hekenga::DocumentTask
    if task.parallel
      start_parallel_task(task, task_idx)
    else
      start_document_task(task, task_idx)
    end
  end
end

#performed?Boolean

Returns:

  • (Boolean)


62
63
64
# File 'lib/hekenga/migration.rb', line 62

def performed?
  !!log(self.tasks.length - 1).done
end

#performing?Boolean

Returns:

  • (Boolean)


58
59
60
# File 'lib/hekenga/migration.rb', line 58

def performing?
  Hekenga::Log.where(pkey: self.to_key, done: false).any?
end

#recover!Object



94
95
96
# File 'lib/hekenga/migration.rb', line 94

def recover!
  Hekenga::MasterProcess.new(self).recover!
end

#reload_logsObject

API



54
55
56
# File 'lib/hekenga/migration.rb', line 54

def reload_logs
  @logs = {}
end

#simple_failure!(error) ⇒ Object



174
175
176
177
178
179
180
181
# File 'lib/hekenga/migration.rb', line 174

def simple_failure!(error)
  log.add_failure({
    message:   error.to_s,
    backtrace: error.backtrace,
    simple:    true
  }, Hekenga::Failure::Error)
  log_cancel!
end

#start_document_task(task, task_idx, recover: false) ⇒ Object



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/hekenga/migration.rb', line 131

def start_document_task(task, task_idx, recover: false)
  create_log!
  records = []
  task_records(task_idx).delete_all unless recover
  executor_key = BSON::ObjectId.new
  Hekenga::MongoidIterator.new(scope: task.scope, cursor_timeout: task.cursor_timeout).each do |record|
    records.push(record)
    next unless records.length == (task.batch_size || batch_size)

    records = filter_out_processed(task, task_idx, records)
    next unless records.length == (task.batch_size || batch_size)

    execute_document_task(task_idx, executor_key, records)
    records = []
    return if log.cancel
  end
  records = filter_out_processed(task, task_idx, records)
  execute_document_task(task_idx, executor_key, records) if records.any?
  return if log.cancel
  log_done!
end

#start_parallel_task(task, task_idx) ⇒ Object



117
118
119
120
121
122
123
124
125
# File 'lib/hekenga/migration.rb', line 117

def start_parallel_task(task, task_idx)
  create_log!
  Hekenga::ParallelTask.new(
    migration: self,
    task:      task,
    task_idx:  task_idx,
    test_mode: test_mode
  ).start!
end

#start_simple_task(task) ⇒ Object

Internal perform methods



99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/hekenga/migration.rb', line 99

def start_simple_task(task)
  create_log!
  begin
    @context = Hekenga::Context.new(test_mode: test_mode)
    task.up!(@context)
  rescue => e
    simple_failure!(e)
    return
  ensure
    @context = nil
  end
  log_done!
end

#task_records(task_idx) ⇒ Object



127
128
129
# File 'lib/hekenga/migration.rb', line 127

def task_records(task_idx)
  Hekenga::DocumentTaskRecord.where(migration_key: to_key, task_idx: task_idx)
end

#test_mode!Object



66
67
68
# File 'lib/hekenga/migration.rb', line 66

def test_mode!
  @test_mode = true
end

#timestampObject

Internal



22
23
24
# File 'lib/hekenga/migration.rb', line 22

def timestamp
  self.stamp.strftime("%Y-%m-%dT%H:%M")
end

#to_keyObject



34
35
36
# File 'lib/hekenga/migration.rb', line 34

def to_key
  @pkey ||= "#{timestamp}-#{desc_to_token}"
end

#validate!Object



195
196
197
198
199
200
201
# File 'lib/hekenga/migration.rb', line 195

def validate!
  validation_error(:stamp,       "missing")   unless self.stamp.is_a?(Time)
  validation_error(:description, "missing")   unless self.description
  validation_error(:description, "too short") unless self.desc_to_token.length > 5
  validation_error(:tasks,       "missing")   if self.tasks.length.zero?
  true
end

#validation_error(field, reason) ⇒ Object

Raises:



191
192
193
# File 'lib/hekenga/migration.rb', line 191

def validation_error(field, reason)
  raise Hekenga::Invalid.new(self, field, reason)
end