Class: RR::ReplicationRun
- Inherits:
-
Object
- Object
- RR::ReplicationRun
- Defined in:
- lib/rubyrep/replication_run.rb
Overview
Executes a single replication run
Instance Attribute Summary collapse
-
#session ⇒ Object
The current Session object.
-
#sweeper ⇒ Object
The current TaskSweeper.
Instance Method Summary collapse
-
#event_filtered?(diff) ⇒ Boolean
Calls the event filter for the give difference.
-
#helper ⇒ Object
Returns the current ReplicationHelper; creates it if necessary.
-
#initialize(session, sweeper) ⇒ ReplicationRun
constructor
Creates a new ReplicationRun instance.
-
#install_sweeper ⇒ Object
Installs the current sweeper into the database connections.
-
#load_difference ⇒ Object
Returns the next available ReplicationDifference.
-
#replicator ⇒ Object
Returns the current replicator; creates it if necessary.
-
#run ⇒ Object
Executes the replication run.
-
#second_chancers ⇒ Object
An array of ReplicationDifference which originally failed replication but should be tried one more time.
Constructor Details
#initialize(session, sweeper) ⇒ ReplicationRun
Creates a new ReplicationRun instance.
-
session
: the current Session -
sweeper
: the current TaskSweeper
136 137 138 139 140 |
# File 'lib/rubyrep/replication_run.rb', line 136 def initialize(session, sweeper) self.session = session self.sweeper = sweeper install_sweeper end |
Instance Attribute Details
#session ⇒ Object
The current Session object
9 10 11 |
# File 'lib/rubyrep/replication_run.rb', line 9 def session @session end |
#sweeper ⇒ Object
The current TaskSweeper
12 13 14 |
# File 'lib/rubyrep/replication_run.rb', line 12 def sweeper @sweeper end |
Instance Method Details
#event_filtered?(diff) ⇒ Boolean
Calls the event filter for the give difference.
-
diff
: instance of ReplicationDifference
Returns true
if replication of the difference should not proceed.
33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/rubyrep/replication_run.rb', line 33 def event_filtered?(diff) event_filter = helper.(diff.changes[:left].table)[:event_filter] if event_filter && event_filter.respond_to?(:before_replicate) not event_filter.before_replicate( diff.changes[:left].table, helper.type_cast(diff.changes[:left].table, diff.changes[:left].key), helper, diff ) else false end end |
#helper ⇒ Object
Returns the current ReplicationHelper; creates it if necessary
20 21 22 |
# File 'lib/rubyrep/replication_run.rb', line 20 def helper @helper ||= ReplicationHelper.new(self) end |
#install_sweeper ⇒ Object
Installs the current sweeper into the database connections
124 125 126 127 128 129 130 131 |
# File 'lib/rubyrep/replication_run.rb', line 124 def install_sweeper [:left, :right].each do |database| unless session.send(database).respond_to?(:sweeper) session.send(database).send(:extend, NoisyConnection) end session.send(database).sweeper = sweeper end end |
#load_difference ⇒ Object
Returns the next available ReplicationDifference. (Either new unprocessed differences or if not available, the first available ‘second chancer’.)
50 51 52 53 54 55 56 57 58 59 |
# File 'lib/rubyrep/replication_run.rb', line 50 def load_difference @loaders ||= LoggedChangeLoaders.new(session) @loaders.update # ensure the cache of change log records is up-to-date diff = ReplicationDifference.new @loaders diff.load unless diff.loaded? or second_chancers.empty? diff = second_chancers.shift end diff end |
#replicator ⇒ Object
Returns the current replicator; creates it if necessary.
25 26 27 28 |
# File 'lib/rubyrep/replication_run.rb', line 25 def replicator @replicator ||= Replicators.replicators[session.configuration.[:replicator]].new(helper) end |
#run ⇒ Object
Executes the replication run.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/rubyrep/replication_run.rb', line 62 def run return unless [:left, :right].any? do |database| changes_pending = false t = Thread.new do changes_pending = session.send(database).select_one( "select id from #{session.configuration.options[:rep_prefix]}_pending_changes limit 1" ) != nil end t.join session.configuration.[:database_connection_timeout] changes_pending end # Apparently sometimes above check for changes takes already so long, that # the replication run times out. # Check for this and if timed out, return (silently). return if sweeper.terminated? success = false begin replicator # ensure that replicator is created and has chance to validate settings loop do begin diff = load_difference break unless diff.loaded? break if sweeper.terminated? if diff.type != :no_diff and not event_filtered?(diff) replicator.replicate_difference diff end rescue Exception => e if e. =~ /violates foreign key constraint|foreign key constraint fails/i and !diff.second_chance? # Note: # Identifying the foreign key constraint violation via regular expression is # database dependent and *dirty*. # It would be better to use the ActiveRecord #translate_exception mechanism. # However as per version 3.0.5 this doesn't work yet properly. diff.second_chance = true second_chancers << diff else begin helper.log_replication_outcome diff, e., e.class.to_s + "\n" + e.backtrace.join("\n") rescue Exception => _ # if logging to database itself fails, re-raise the original exception raise e end end end end success = true ensure if sweeper.terminated? helper.finalize false session.disconnect_databases else helper.finalize success end end end |
#second_chancers ⇒ Object
An array of ReplicationDifference which originally failed replication but should be tried one more time
15 16 17 |
# File 'lib/rubyrep/replication_run.rb', line 15 def second_chancers @second_chancers ||= [] end |