Method: RR::ReplicationRun#run

Defined in:
lib/rubyrep/replication_run.rb

#runObject

Executes the replication run.



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/rubyrep/replication_run.rb', line 43

def run
  return unless [:left, :right].any? do |database|
    changes_pending = false
    t = Thread.new do
      changes_pending = session.send(database).select_one(
        "select id from #{session.configuration.options[:rep_prefix]}_pending_changes limit 1"
      ) != nil
    end
    t.join session.configuration.options[:database_connection_timeout]
    changes_pending
  end

  # Apparently sometimes above check for changes takes already so long, that
  # the replication run times out.
  # Check for this and if timed out, return (silently).
  return if sweeper.terminated?

  loaders = LoggedChangeLoaders.new(session)

  success = false
  begin
    replicator # ensure that replicator is created and has chance to validate settings

    loop do
      begin
        loaders.update # ensure the cache of change log records is up-to-date
        diff = ReplicationDifference.new loaders
        diff.load
        break unless diff.loaded?
        break if sweeper.terminated?
        if diff.type != :no_diff and not event_filtered?(diff)
          replicator.replicate_difference diff
        end
      rescue Exception => e
        begin
          helper.log_replication_outcome diff, e.message,
            e.class.to_s + "\n" + e.backtrace.join("\n")
        rescue Exception => _
          # if logging to database itself fails, re-raise the original exception
          raise e
        end
      end
    end
    success = true
  ensure
    if sweeper.terminated?
      helper.finalize false
      session.disconnect_databases
    else
      helper.finalize success
    end
  end
end