Module: PgVersions

Defined in:
lib/pg_versions/pg_versions.rb,
lib/pg_versions.rb,
lib/pg_versions/rails.rb,
lib/pg_versions/version.rb

Overview

TODO: prepared statements? TODO: use ractor instead of thread for event listening

Defined Under Namespace

Classes: Connection, ConnectionClosed, ConnectionInner, Engine, InvalidParameters, Notification

Constant Summary collapse

VERSION =
"3.0"

Class Method Summary collapse

Class Method Details

.bump(connection, *channels) ⇒ Object

TODO: ensure this is called only once per transaction, or that all bumps occur in the same order in all transactions, to avoid deadlocks



145
146
147
148
149
150
151
152
153
154
# File 'lib/pg_versions/pg_versions.rb', line 145

def self.bump(connection, *channels)
	channels = [channels].flatten.sort
	PgVersions.with_connection(connection, false) { |pg_connection|
		sql = self.bump_sql(*channels)
		return {} if sql == ""
		pg_connection.exec(sql) { |result|
			result.map { |row| [channels[Integer(row["i"])], string_to_version(row["version"])] }.to_h
		}
	}
end

.bump_sql(*channels) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/pg_versions/pg_versions.rb', line 113

def self.bump_sql(*channels)
	channels = [channels].flatten.sort
	return ""  if channels.size == 0
	encoder = PG::TextEncoder::QuotedLiteral.new(elements_type: PG::TextEncoder::String.new)
	quoted_channels = channels.map.with_index { |channel, i| "(#{i},#{encoder.encode(channel)})" }.join(", ")
	# table-wide share lock is there to mutually exclude table cleaner
	# clock_timestamp() - this has to be a timestamp after table lock got acquired
	"
		LOCK TABLE pg_versions IN ACCESS SHARE MODE;
		WITH
			to_bump(i, channel) AS (VALUES #{quoted_channels})
			, current_instant(ts) AS (VALUES (clock_timestamp()))
			, updated AS (
					INSERT INTO pg_versions(channel, instant, counter)
					SELECT to_bump.channel, (SELECT ts FROM current_instant), 0 FROM to_bump
					ON CONFLICT (channel) DO UPDATE SET
						instant = GREATEST(pg_versions.instant, EXCLUDED.instant),
						counter = CASE WHEN pg_versions.instant < EXCLUDED.instant THEN 0 ELSE pg_versions.counter + 1 END
					RETURNING channel, instant, pg_versions.counter
					)
		SELECT DISTINCT
			i
			, #{timestamp_to_integers('updated.instant')} || ',' || updated.counter::text AS version
			, pg_notify(updated.channel::text, #{timestamp_to_integers('updated.instant')} || ',' || updated.counter::text)::text
		FROM
			to_bump
			JOIN updated ON to_bump.channel = updated.channel;
	"
end

.create_table(connection) ⇒ Object



95
96
97
98
99
100
101
# File 'lib/pg_versions/pg_versions.rb', line 95

def self.create_table(connection)
	PgVersions.with_connection(connection, false) { |pg_connection|
		open(File.dirname(__FILE__)+"/../../create-table.sql") { |sql_file|
			pg_connection.exec sql_file.read
		}
	}
end

.drop_table(connection) ⇒ Object



104
105
106
107
108
109
110
# File 'lib/pg_versions/pg_versions.rb', line 104

def self.drop_table(connection)
	PgVersions.with_connection(connection, false) { |pg_connection|
		open(File.dirname(__FILE__)+"/../../drop-table.sql") { |sql_file|
			pg_connection.exec sql_file.read
		}
	}
end

.read(connection, *channels) ⇒ Object

TODO: bump in the same query instead of calling bump TODO: do we really need to bump though? TODO: and then, implement read_sql



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/pg_versions/pg_versions.rb', line 160

def self.read(connection, *channels)
	PgVersions.with_connection(connection, false) { |pg_connection|
		channels = [channels].flatten.sort
		return {} if channels.size == 0
		versions = {}
		quoted_channels = channels.map.with_index { |channel, i| "(#{i},'#{pg_connection.escape_string(channel)}')" }.join(", ")
		not_found_channels = pg_connection.exec("
			LOCK TABLE pg_versions IN ACCESS SHARE MODE;
			WITH
				channels(i, channel) AS (VALUES #{quoted_channels})
			SELECT
				i
				, #{timestamp_to_integers('instant')} || ',' || counter AS version
			FROM
				channels
				JOIN pg_versions ON pg_versions.channel = channels.channel
			ORDER BY
				i DESC;
		") { |result|
			result.each { |row|
				versions[channels.delete_at(Integer(row["i"]))] = string_to_version(row["version"])
			}
		}
		versions.merge!(self.bump(pg_connection, channels))  if channels.size > 0
		versions
	}
end

.string_to_version(version_str) ⇒ Object



90
91
92
# File 'lib/pg_versions/pg_versions.rb', line 90

def self.string_to_version(version_str)
	version_str.split(",").map { |str| Integer(str) }
end

.timestamp_to_integers(input) ⇒ Object



40
41
42
# File 'lib/pg_versions/pg_versions.rb', line 40

def self.timestamp_to_integers(input)
	"to_char(%s, 'YYYYMMDD')::integer || ',' || to_char(%s, 'HH24MISS')::integer || ',' || to_char(%s, 'US')::integer"%[input, input, input]
end

.with_connection(pg_connection_param, reset, &block) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/pg_versions/pg_versions.rb', line 45

def self.with_connection(pg_connection_param, reset, &block)
	if pg_connection_param.kind_of? ::PG::Connection
		if reset
			pg_connection_param.sync_reset
			pg_connection_param.exec("select;")
		end
		block.call(pg_connection_param)
	elsif pg_connection_param.respond_to? :call
		pg_connection_param.call(reset, &block)
	elsif pg_connection_param.kind_of?(String) or pg_connection_param.kind_of?(Hash)
		Thread.handle_interrupt(Object => :never) {
			begin
				pg_connection = nil
				Thread.handle_interrupt(Object => :immediate) {
					pg_connection = ::PG.connect(pg_connection_param)
					block.call(pg_connection)
				}
			ensure
				pg_connection&.close
			end
		}
	elsif defined?(ActiveRecord) and pg_connection_param.kind_of?(Class) and pg_connection_param <= ActiveRecord::Base
		pg_connection = pg_connection_param.connection.raw_connection
		if reset
			pg_connection.sync_reset
			pg_connection.exec("select;")
		end
		block.call(pg_connection)
	else
		raise InvalidParameters, "Invalid connection parameter (#{pg_connection_param.inspect}). Either pass PG::Connection object, url string, hash, ActiveRecord::Base class (or subclass) or call one of the ActiveRecord methods that come with PgVersions refinement."
	end
end