Module: PgVersions
- Defined in:
- lib/pg_versions/pg_versions.rb,
lib/pg_versions.rb,
lib/pg_versions/rails.rb,
lib/pg_versions/version.rb
Overview
TODO: prepared statements?
Defined Under Namespace
Classes: Connection, ConnectionAcquisitionFailedError, Engine, Notification
Constant Summary collapse
- VERSION =
"1.0"
Class Method Summary collapse
-
.bump(*channels, connection: nil) ⇒ Object
TODO: ensure this is called only once per transaction, or that all bumps occur in the same order in all transactions, to avoid deadlocks.
- .create_table(connection = nil) ⇒ Object
- .drop_table(connection = nil) ⇒ Object
- .read(*channels, connection: nil) ⇒ Object
- .string_to_version(version_str) ⇒ Object
- .timestamp_to_integers(input) ⇒ Object
- .with_connection(pg_connection) ⇒ Object
Class Method Details
.bump(*channels, connection: nil) ⇒ Object
TODO: ensure this is called only once per transaction, or that all bumps occur in the same order in all transactions, to avoid deadlocks
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/pg_versions/pg_versions.rb', line 77 def self.bump(*channels, connection: nil) PgVersions.with_connection(connection) { |pg_connection| channels = [channels].flatten.sort return {} if channels.size == 0 quoted_channels = channels.map.with_index { |channel, i| "(#{i},'#{pg_connection.escape_string(channel)}')" }.join(", ") # table-wide share lock is there to mutually exclude table cleaner # clock_timestamp() - this has to be a timestamp after table lock got acquired pg_connection.exec(" LOCK TABLE pg_versions IN ACCESS SHARE MODE; WITH to_bump(i, channel) AS (VALUES #{quoted_channels}) , current_instant(ts) AS (VALUES (clock_timestamp())) , updated AS ( INSERT INTO pg_versions(channel, instant, counter) SELECT to_bump.channel, (SELECT ts FROM current_instant), 0 FROM to_bump ON CONFLICT (channel) DO UPDATE SET instant = GREATEST(pg_versions.instant, EXCLUDED.instant), counter = CASE WHEN pg_versions.instant < EXCLUDED.instant THEN 0 ELSE pg_versions.counter + 1 END RETURNING channel, instant, pg_versions.counter ) SELECT DISTINCT i , #{('updated.instant')} || ',' || updated.counter::text AS version , pg_notify(updated.channel::text, #{('updated.instant')} || ',' || updated.counter::text)::text FROM to_bump JOIN updated ON to_bump.channel = updated.channel; ").map { |row| [channels[Integer(row["i"])], string_to_version(row["version"])] }.to_h } end |
.create_table(connection = nil) ⇒ Object
60 61 62 63 64 65 66 |
# File 'lib/pg_versions/pg_versions.rb', line 60 def self.create_table(connection=nil) PgVersions.with_connection(connection) { |pg_connection| open(File.dirname(__FILE__)+"/../../create-table.sql") { |sql_file| pg_connection.exec sql_file.read } } end |
.drop_table(connection = nil) ⇒ Object
68 69 70 71 72 73 74 |
# File 'lib/pg_versions/pg_versions.rb', line 68 def self.drop_table(connection=nil) PgVersions.with_connection(connection) { |pg_connection| open(File.dirname(__FILE__)+"/../../drop-table.sql") { |sql_file| pg_connection.exec sql_file.read } } end |
.read(*channels, connection: nil) ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/pg_versions/pg_versions.rb', line 109 def self.read(*channels, connection: nil) PgVersions.with_connection(connection) { |pg_connection| channels = [channels].flatten.sort return {} if channels.size == 0 versions = {} quoted_channels = channels.map.with_index { |channel, i| "(#{i},'#{pg_connection.escape_string(channel)}')" }.join(", ") not_found_channels = pg_connection.exec(" LOCK TABLE pg_versions IN ACCESS SHARE MODE; WITH channels(i, channel) AS (VALUES #{quoted_channels}) SELECT i , #{('instant')} || ',' || counter AS version FROM channels JOIN pg_versions ON pg_versions.channel = channels.channel ORDER BY i DESC; ").each { |row| versions[channels.delete_at(Integer(row["i"]))] = string_to_version(row["version"]) } #TODO: bump in the same query instead of calling bump versions.merge!(self.bump(channels, connection: pg_connection)) if channels.size > 0 versions } end |
.string_to_version(version_str) ⇒ Object
55 56 57 |
# File 'lib/pg_versions/pg_versions.rb', line 55 def self.string_to_version(version_str) version_str.split(",").map { |str| Integer(str) } end |
.timestamp_to_integers(input) ⇒ Object
37 38 39 |
# File 'lib/pg_versions/pg_versions.rb', line 37 def self.(input) "to_char(%s, 'YYYYMMDD')::integer || ',' || to_char(%s, 'HH24MISS')::integer || ',' || to_char(%s, 'US')::integer"%[input, input, input] end |
.with_connection(pg_connection) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/pg_versions/pg_versions.rb', line 42 def self.with_connection(pg_connection) if pg_connection yield pg_connection elsif defined? ActiveRecord ActiveRecord::Base.connection_pool.with_connection { |ar_connection| yield ar_connection.instance_variable_get(:@connection) } else raise ConnectionAcquisitionFailedError, "Missing connection. Either pass pg connection object or import ActiveRecord." end end |