Module: PgVersions

Defined in:
lib/pg_versions/pg_versions.rb,
lib/pg_versions.rb,
lib/pg_versions/rails.rb,
lib/pg_versions/version.rb

Overview

TODO: prepared statements? TODO: use ractor instead of thread for event listening

Defined Under Namespace

Classes: Connection, ConnectionClosed, ConnectionInner, Engine, InvalidParameters, Notification

Constant Summary collapse

VERSION =
"3.3"

Class Method Summary collapse

Class Method Details

.bump(connection, *channels) ⇒ Object

TODO: ensure this is called only once per transaction, or that all bumps occur in the same order in all transactions, to avoid deadlocks



147
148
149
150
151
152
153
154
155
156
# File 'lib/pg_versions/pg_versions.rb', line 147

def self.bump(connection, *channels)
  channels = [channels].flatten.sort.uniq
  PgVersions.with_connection(connection, false) { |pg_connection|
    sql = self.bump_sql(*channels)
    return {} if sql == ""
    pg_connection.exec(sql) { |result|
      result.map { |row| [channels[Integer(row["i"])], string_to_version(row["version"])] }.to_h
    }
  }
end

.bump_sql(*channels) ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/pg_versions/pg_versions.rb', line 115

def self.bump_sql(*channels)
  channels = [channels].flatten
  return ""  if channels.size == 0
  encoder = PG::TextEncoder::QuotedLiteral.new(elements_type: PG::TextEncoder::String.new)
  quoted_channels = channels.map.with_index { |channel, i| "(#{i},#{encoder.encode(channel)})" }.join(", ")
  # table-wide share lock is there to mutually exclude table cleaner
  # clock_timestamp() - this has to be a timestamp after table lock got acquired
  "
    LOCK TABLE pg_versions IN ACCESS SHARE MODE;
    WITH
      to_bump(i, channel) AS (VALUES #{quoted_channels})
      , current_instant(ts) AS (VALUES (clock_timestamp()))
      , updated AS (
          INSERT INTO pg_versions(channel, instant, counter)
          SELECT to_bump.channel, (SELECT ts FROM current_instant), 0 FROM to_bump
          ON CONFLICT (channel) DO UPDATE SET
            instant = GREATEST(pg_versions.instant, EXCLUDED.instant),
            counter = CASE WHEN pg_versions.instant < EXCLUDED.instant THEN 0 ELSE pg_versions.counter + 1 END
          RETURNING channel, instant, pg_versions.counter
          )
    SELECT DISTINCT
      i
      , #{timestamp_to_integers('updated.instant')} || ',' || updated.counter::text AS version
      , pg_notify(updated.channel::text, #{timestamp_to_integers('updated.instant')} || ',' || updated.counter::text)::text
    FROM
      to_bump
      JOIN updated ON to_bump.channel = updated.channel;
  "
end

.create_table(connection) ⇒ Object



97
98
99
100
101
102
103
# File 'lib/pg_versions/pg_versions.rb', line 97

def self.create_table(connection)
  PgVersions.with_connection(connection, false) { |pg_connection|
    open(File.dirname(__FILE__)+"/../../create-table.sql") { |sql_file|
      pg_connection.exec sql_file.read
    }
  }
end

.drop_table(connection) ⇒ Object



106
107
108
109
110
111
112
# File 'lib/pg_versions/pg_versions.rb', line 106

def self.drop_table(connection)
  PgVersions.with_connection(connection, false) { |pg_connection|
    open(File.dirname(__FILE__)+"/../../drop-table.sql") { |sql_file|
      pg_connection.exec sql_file.read
    }
  }
end

.read(connection, *channels) ⇒ Object

TODO: bump in the same query instead of calling bump TODO: do we really need to bump though? TODO: and then, implement read_sql



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/pg_versions/pg_versions.rb', line 162

def self.read(connection, *channels)
  PgVersions.with_connection(connection, false) { |pg_connection|
    channels = [channels].flatten.sort.uniq
    return {} if channels.size == 0
    versions = {}
    quoted_channels = channels.map.with_index { |channel, i| "(#{i},'#{pg_connection.escape_string(channel)}')" }.join(", ")
    not_found_channels = pg_connection.exec("
      LOCK TABLE pg_versions IN ACCESS SHARE MODE;
      WITH
        channels(i, channel) AS (VALUES #{quoted_channels})
      SELECT
        i
        , #{timestamp_to_integers('instant')} || ',' || counter AS version
      FROM
        channels
        JOIN pg_versions ON pg_versions.channel = channels.channel
      ORDER BY
        i DESC;
    ") { |result|
      result.each { |row|
        versions[channels.delete_at(Integer(row["i"]))] = string_to_version(row["version"])
      }
    }
    versions.merge!(self.bump(pg_connection, channels))  if channels.size > 0
    versions
  }
end

.string_to_version(version_str) ⇒ Object



92
93
94
# File 'lib/pg_versions/pg_versions.rb', line 92

def self.string_to_version(version_str)
  version_str.split(",").map { |str| Integer(str) }
end

.timestamp_to_integers(input) ⇒ Object



40
41
42
# File 'lib/pg_versions/pg_versions.rb', line 40

def self.timestamp_to_integers(input)
  "to_char(%s AT TIME ZONE 'UTC', 'YYYYMMDD')::integer || ',' || to_char(%s AT TIME ZONE 'UTC', 'HH24MISS')::integer || ',' || to_char(%s AT TIME ZONE 'UTC', 'US')::integer"%[input, input, input]
end

.with_connection(pg_connection_param, reset, &block) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/pg_versions/pg_versions.rb', line 45

def self.with_connection(pg_connection_param, reset, &block)
  if pg_connection_param.kind_of? ::PG::Connection
    if reset
      pg_connection_param.sync_reset
      pg_connection_param.exec("select;")
    end
    block.call(pg_connection_param)
  elsif pg_connection_param.respond_to? :call
    pg_connection_param.call(reset, &block)
  elsif pg_connection_param.kind_of?(String) or pg_connection_param.kind_of?(Hash)
    Thread.handle_interrupt(Object => :never) {
      begin
        pg_connection = nil
        Thread.handle_interrupt(Object => :immediate) {
          pg_connection = ::PG.connect(pg_connection_param)
          block.call(pg_connection)
        }
      ensure
        pg_connection&.close
      end
    }
  elsif defined?(ActiveRecord) and pg_connection_param.kind_of?(Class) and pg_connection_param <= ActiveRecord::Base
    pg_connection = pg_connection_param.connection.raw_connection
    if reset
      pg_connection.sync_reset
      pg_connection.exec("select;")
    end
    block.call(pg_connection)
  else
    raise InvalidParameters, "Invalid connection parameter (#{pg_connection_param.inspect}). Either pass PG::Connection object, url string, hash, ActiveRecord::Base class (or subclass) or call one of the ActiveRecord methods that come with PgVersions refinement."
  end
end