Module: PgVersions

Defined in:
lib/pg_versions/pg_versions.rb,
lib/pg_versions.rb,
lib/pg_versions/rails.rb,
lib/pg_versions/version.rb

Overview

TODO: prepared statements?

Defined Under Namespace

Classes: Connection, ConnectionAcquisitionFailedError, Engine, Notification

Constant Summary collapse

VERSION =
"1.0"

Class Method Summary collapse

Class Method Details

.bump(*channels, connection: nil) ⇒ Object

TODO: ensure this is called only once per transaction, or that all bumps occur in the same order in all transactions, to avoid deadlocks



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/pg_versions/pg_versions.rb', line 77

def self.bump(*channels, connection: nil)
	PgVersions.with_connection(connection) { |pg_connection|
		channels = [channels].flatten.sort
		return {} if channels.size == 0
		quoted_channels = channels.map.with_index { |channel, i| "(#{i},'#{pg_connection.escape_string(channel)}')" }.join(", ")
		# table-wide share lock is there to mutually exclude table cleaner
		# clock_timestamp() - this has to be a timestamp after table lock got acquired
		pg_connection.exec("
			LOCK TABLE pg_versions IN ACCESS SHARE MODE;
			WITH 
				to_bump(i, channel) AS (VALUES #{quoted_channels})
				, current_instant(ts) AS (VALUES (clock_timestamp()))
				, updated AS (
						INSERT INTO pg_versions(channel, instant, counter)
						SELECT to_bump.channel, (SELECT ts FROM current_instant), 0 FROM to_bump
						ON CONFLICT (channel) DO UPDATE SET
							instant = GREATEST(pg_versions.instant, EXCLUDED.instant),
							counter = CASE WHEN pg_versions.instant < EXCLUDED.instant THEN 0 ELSE pg_versions.counter + 1 END
						RETURNING channel, instant, pg_versions.counter
						)
			SELECT DISTINCT
				i
				, #{timestamp_to_integers('updated.instant')} || ',' || updated.counter::text AS version
				, pg_notify(updated.channel::text, #{timestamp_to_integers('updated.instant')} || ',' || updated.counter::text)::text
			FROM
				to_bump
				JOIN updated ON to_bump.channel = updated.channel;
		").map { |row| [channels[Integer(row["i"])], string_to_version(row["version"])] }.to_h
	}
end

.create_table(connection = nil) ⇒ Object



60
61
62
63
64
65
66
# File 'lib/pg_versions/pg_versions.rb', line 60

def self.create_table(connection=nil)
	PgVersions.with_connection(connection) { |pg_connection|
		open(File.dirname(__FILE__)+"/../../create-table.sql") { |sql_file|
			pg_connection.exec sql_file.read
		}
	}
end

.drop_table(connection = nil) ⇒ Object



68
69
70
71
72
73
74
# File 'lib/pg_versions/pg_versions.rb', line 68

def self.drop_table(connection=nil)
	PgVersions.with_connection(connection) { |pg_connection|
		open(File.dirname(__FILE__)+"/../../drop-table.sql") { |sql_file|
			pg_connection.exec sql_file.read
		}
	}
end

.read(*channels, connection: nil) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/pg_versions/pg_versions.rb', line 109

def self.read(*channels, connection: nil)
	PgVersions.with_connection(connection) { |pg_connection|
		channels = [channels].flatten.sort
		return {} if channels.size == 0
		versions = {}
		quoted_channels = channels.map.with_index { |channel, i| "(#{i},'#{pg_connection.escape_string(channel)}')" }.join(", ")
		not_found_channels = pg_connection.exec("
			LOCK TABLE pg_versions IN ACCESS SHARE MODE;
			WITH
				channels(i, channel) AS (VALUES #{quoted_channels})
			SELECT
				i
				, #{timestamp_to_integers('instant')} || ',' || counter AS version
			FROM
				channels
				JOIN pg_versions ON pg_versions.channel = channels.channel
			ORDER BY
				i DESC;
		").each { |row|
			versions[channels.delete_at(Integer(row["i"]))] = string_to_version(row["version"])
		}
		#TODO: bump in the same query instead of calling bump
		versions.merge!(self.bump(channels, connection: pg_connection))  if channels.size > 0
		versions
	}
end

.string_to_version(version_str) ⇒ Object



55
56
57
# File 'lib/pg_versions/pg_versions.rb', line 55

def self.string_to_version(version_str) 
	version_str.split(",").map { |str| Integer(str) }
end

.timestamp_to_integers(input) ⇒ Object



37
38
39
# File 'lib/pg_versions/pg_versions.rb', line 37

def self.timestamp_to_integers(input)
	"to_char(%s, 'YYYYMMDD')::integer || ',' || to_char(%s, 'HH24MISS')::integer || ',' || to_char(%s, 'US')::integer"%[input, input, input]
end

.with_connection(pg_connection) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
# File 'lib/pg_versions/pg_versions.rb', line 42

def self.with_connection(pg_connection)
	if pg_connection
		yield pg_connection
	elsif defined? ActiveRecord
		ActiveRecord::Base.connection_pool.with_connection { |ar_connection|
			yield ar_connection.instance_variable_get(:@connection)
		}
	else
		raise ConnectionAcquisitionFailedError, "Missing connection. Either pass pg connection object or import ActiveRecord."
	end
end