Module: PG::Replication
- Defined in:
- lib/pg/replication.rb,
lib/pg/replication/buffer.rb,
lib/pg/replication/version.rb,
lib/pg/replication/protocol.rb,
lib/pg/replication/pg_output.rb
Defined Under Namespace
Modules: PGOutput, Protocol
Classes: Buffer, StreamError
Constant Summary
collapse
- DEFAULT_QUEUE_SIZE =
10_000
- StreamEnd =
Object.new.freeze
- POSTGRES_EPOCH =
Time.utc(2000, 1, 1).freeze
- POSTGRES_EPOCH_USECS =
(POSTGRES_EPOCH.to_r * 1_000_000).to_i
- VERSION =
"0.0.8"
Instance Method Summary
collapse
-
#confirmed_slot_lsn(slot) ⇒ Object
-
#last_confirmed_lsn ⇒ Object
-
#standby_status_update(write_lsn:, flush_lsn: write_lsn, apply_lsn: write_lsn, timestamp: Time.now, reply: false) ⇒ Object
-
#start_pgoutput_replication_slot(slot, publication_names, **kwargs) ⇒ Object
-
#start_replication_slot(slot, logical: true, auto_keep_alive: true, location: "0/0", queue_size: DEFAULT_QUEUE_SIZE, **params) ⇒ Object
-
#stop_replication ⇒ Object
-
#wal_receiver_status_interval ⇒ Object
Instance Method Details
#confirmed_slot_lsn(slot) ⇒ Object
92
93
94
95
96
97
98
99
100
|
# File 'lib/pg/replication.rb', line 92
def confirmed_slot_lsn(slot)
lsn = query(<<~SQL).getvalue(0, 0)
SELECT confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = '#{slot}'
SQL
high, low = lsn.split("/")
(high.to_i(16) << 32) + low.to_i(16)
rescue StandardError
nil
end
|
#last_confirmed_lsn ⇒ Object
76
77
78
|
# File 'lib/pg/replication.rb', line 76
def last_confirmed_lsn
status_update_mutex.synchronize { @last_confirmed_lsn }
end
|
#standby_status_update(write_lsn:, flush_lsn: write_lsn, apply_lsn: write_lsn, timestamp: Time.now, reply: false) ⇒ Object
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
# File 'lib/pg/replication.rb', line 53
def standby_status_update(
write_lsn:,
flush_lsn: write_lsn,
apply_lsn: write_lsn,
timestamp: Time.now,
reply: false
)
msg = [
"r".bytes.first,
write_lsn,
flush_lsn,
apply_lsn,
((timestamp.to_r - POSTGRES_EPOCH.to_r) * 1_000_000).to_i,
reply ? 1 : 0,
].pack("CQ>Q>Q>Q>C")
status_update_mutex.synchronize do
put_copy_data(msg)
flush
@last_confirmed_lsn = [@last_confirmed_lsn, write_lsn].compact.max
end
end
|
#start_pgoutput_replication_slot(slot, publication_names, **kwargs) ⇒ Object
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
# File 'lib/pg/replication.rb', line 38
def start_pgoutput_replication_slot(slot, publication_names, **kwargs)
publication_names = publication_names.join(",")
start_replication_slot(slot, **kwargs.merge(proto_version: "1", publication_names:))
.map do |msg|
case msg
in Protocol::XLogData(data:, lsn:)
data = data.force_encoding(internal_encoding)
msg.with(data: PGOutput.read_message(Buffer.from_string(data)))
else
msg
end
end
end
|
#start_replication_slot(slot, logical: true, auto_keep_alive: true, location: "0/0", queue_size: DEFAULT_QUEUE_SIZE, **params) ⇒ Object
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
# File 'lib/pg/replication.rb', line 17
def start_replication_slot(slot, logical: true, auto_keep_alive: true, location: "0/0", queue_size: DEFAULT_QUEUE_SIZE, **params)
keep_alive_secs = wal_receiver_status_interval
@last_confirmed_lsn = confirmed_slot_lsn(slot) || 0
start_query = "START_REPLICATION SLOT #{slot} #{logical ? "LOGICAL" : "PHYSICAL"} #{location}"
unless params.empty?
start_query << "("
start_query << params
.map { |k, v| "#{quote_ident(k.to_s)} '#{escape_string(v.to_s)}'" }
.join(", ")
start_query << ")"
end
query(start_query)
if auto_keep_alive
start_threaded_replication(keep_alive_secs, queue_size)
else
start_sync_replication
end
end
|
#stop_replication ⇒ Object
80
81
82
83
84
|
# File 'lib/pg/replication.rb', line 80
def stop_replication
status_update_mutex.synchronize do
put_copy_end
end
end
|
#wal_receiver_status_interval ⇒ Object
86
87
88
89
90
|
# File 'lib/pg/replication.rb', line 86
def wal_receiver_status_interval
query(<<~SQL).getvalue(0, 0)&.to_i || 10
SELECT setting FROM pg_catalog.pg_settings WHERE name = 'wal_receiver_status_interval'
SQL
end
|