Module: PG::Replication

Defined in:
lib/pg/replication.rb,
lib/pg/replication/buffer.rb,
lib/pg/replication/version.rb,
lib/pg/replication/protocol.rb,
lib/pg/replication/pg_output.rb

Defined Under Namespace

Modules: PGOutput, Protocol Classes: Buffer, StreamError

Constant Summary collapse

DEFAULT_QUEUE_SIZE =
10_000
StreamEnd =
Object.new.freeze
POSTGRES_EPOCH =
Time.utc(2000, 1, 1).freeze
POSTGRES_EPOCH_USECS =
(POSTGRES_EPOCH.to_r * 1_000_000).to_i
VERSION =
"0.0.8"

Instance Method Summary collapse

Instance Method Details

#confirmed_slot_lsn(slot) ⇒ Object



92
93
94
95
96
97
98
99
100
# File 'lib/pg/replication.rb', line 92

def confirmed_slot_lsn(slot)
  lsn = query(<<~SQL).getvalue(0, 0)
    SELECT confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = '#{slot}'
  SQL
  high, low = lsn.split("/")
  (high.to_i(16) << 32) + low.to_i(16)
rescue StandardError
  nil
end

#last_confirmed_lsnObject



76
77
78
# File 'lib/pg/replication.rb', line 76

def last_confirmed_lsn
  status_update_mutex.synchronize { @last_confirmed_lsn }
end

#standby_status_update(write_lsn:, flush_lsn: write_lsn, apply_lsn: write_lsn, timestamp: Time.now, reply: false) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/pg/replication.rb', line 53

def standby_status_update(
  write_lsn:,
  flush_lsn: write_lsn,
  apply_lsn: write_lsn,
  timestamp: Time.now,
  reply: false
)
  msg = [
    "r".bytes.first,
    write_lsn,
    flush_lsn,
    apply_lsn,
    ((timestamp.to_r - POSTGRES_EPOCH.to_r) * 1_000_000).to_i,
    reply ? 1 : 0,
  ].pack("CQ>Q>Q>Q>C")

  status_update_mutex.synchronize do
    put_copy_data(msg)
    flush
    @last_confirmed_lsn = [@last_confirmed_lsn, write_lsn].compact.max
  end
end

#start_pgoutput_replication_slot(slot, publication_names, **kwargs) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/pg/replication.rb', line 38

def start_pgoutput_replication_slot(slot, publication_names, **kwargs)
  publication_names = publication_names.join(",")

  start_replication_slot(slot, **kwargs.merge(proto_version: "1", publication_names:))
    .map do |msg|
      case msg
      in Protocol::XLogData(data:, lsn:)
        data = data.force_encoding(internal_encoding)
        msg.with(data: PGOutput.read_message(Buffer.from_string(data)))
      else
        msg
      end
    end
end

#start_replication_slot(slot, logical: true, auto_keep_alive: true, location: "0/0", queue_size: DEFAULT_QUEUE_SIZE, **params) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/pg/replication.rb', line 17

def start_replication_slot(slot, logical: true, auto_keep_alive: true, location: "0/0", queue_size: DEFAULT_QUEUE_SIZE, **params)
  keep_alive_secs = wal_receiver_status_interval
  @last_confirmed_lsn = confirmed_slot_lsn(slot) || 0

  start_query = "START_REPLICATION SLOT #{slot} #{logical ? "LOGICAL" : "PHYSICAL"} #{location}"
  unless params.empty?
    start_query << "("
    start_query << params
      .map { |k, v| "#{quote_ident(k.to_s)} '#{escape_string(v.to_s)}'" }
      .join(", ")
    start_query << ")"
  end
  query(start_query)

  if auto_keep_alive
    start_threaded_replication(keep_alive_secs, queue_size)
  else
    start_sync_replication
  end
end

#stop_replicationObject



80
81
82
83
84
# File 'lib/pg/replication.rb', line 80

def stop_replication
  status_update_mutex.synchronize do
    put_copy_end
  end
end

#wal_receiver_status_intervalObject



86
87
88
89
90
# File 'lib/pg/replication.rb', line 86

def wal_receiver_status_interval
  query(<<~SQL).getvalue(0, 0)&.to_i || 10
    SELECT setting FROM pg_catalog.pg_settings WHERE name = 'wal_receiver_status_interval'
  SQL
end