Class: PgVersions::Connection::Subscription
- Inherits:
-
Object
- Object
- PgVersions::Connection::Subscription
- Defined in:
- lib/pg_versions/pg_versions.rb
Instance Method Summary collapse
- #bump(*channels, notify: true) ⇒ Object
- #drop ⇒ Object
-
#initialize(connection) ⇒ Subscription
constructor
A new instance of Subscription.
- #notify(channel, payload) ⇒ Object
- #read(*channels, notify: true) ⇒ Object
- #subscribe(channels, known: {}) ⇒ Object
- #unsubscribe(*channels) ⇒ Object
- #update_already_known_versions(new_already_known_versions) ⇒ Object
- #wait(new_already_known_versions = {}) ⇒ Object
Constructor Details
#initialize(connection) ⇒ Subscription
Returns a new instance of Subscription.
219 220 221 222 223 224 |
# File 'lib/pg_versions/pg_versions.rb', line 219 def initialize(connection) @connection = connection @notifications = Queue.new @already_known_versions = Hash.new { |h,k| h[k] = [] } @channels = Hash.new(0) end |
Instance Method Details
#bump(*channels, notify: true) ⇒ Object
277 278 279 280 281 282 283 284 |
# File 'lib/pg_versions/pg_versions.rb', line 277 def bump(*channels, notify: true) channels = @channels.keys if channels.size == 0 versions = @connection.actor_call { |pg_connection, subscribers| PgVersions.bump(channels, connection: pg_connection) } update_already_known_versions(versions) if not notify versions end |
#drop ⇒ Object
305 306 307 308 |
# File 'lib/pg_versions/pg_versions.rb', line 305 def drop @notifications << [nil, nil] unsubscribe(@channels.keys) end |
#notify(channel, payload) ⇒ Object
300 301 302 |
# File 'lib/pg_versions/pg_versions.rb', line 300 def notify(channel, payload) @notifications << [channel, payload] end |
#read(*channels, notify: true) ⇒ Object
267 268 269 270 271 272 273 274 |
# File 'lib/pg_versions/pg_versions.rb', line 267 def read(*channels, notify: true) channels = @channels.keys if channels.size == 0 versions = @connection.actor_call { |pg_connection, subscribers| PgVersions.read(channels, connection: pg_connection) } update_already_known_versions(versions) if not notify versions end |
#subscribe(channels, known: {}) ⇒ Object
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/pg_versions/pg_versions.rb', line 227 def subscribe(channels, known: {}) update_already_known_versions(known) channels = [channels].flatten channels.select! { |channel| (@channels[channel] += 1) == 1 } if channels.size > 0 @connection.actor_call { |pg_connection, subscribers| channels.each { |channel| subscribers[channel] << self pg_connection.exec("LISTEN #{PG::Connection.quote_ident(channel)}") if subscribers[channel].size == 1 } PgVersions.read(channels, connection: pg_connection).each_pair { |channel, version| notify(channel, version) } } end end |
#unsubscribe(*channels) ⇒ Object
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/pg_versions/pg_versions.rb', line 247 def unsubscribe(*channels) channels = [channels].flatten channels.select! { |channel| @channels[channel] -= 1 raise "Trying to unsubscribe from channel (%p) more times than it was subscribed to"%[channel] if @channels[channel] < 0 @channels.delete(channel) if @channels[channel] == 0 not @channels.has_key?(channel) } @connection.actor_call { |pg_connection, subscribers| channels.each { |channel| subscribers[channel].delete(self) if subscribers[channel].size == 0 pg_connection.exec("UNLISTEN #{PG::Connection.quote_ident(channel)}") subscribers.delete(channel) end } } end |
#update_already_known_versions(new_already_known_versions) ⇒ Object
311 312 313 314 315 |
# File 'lib/pg_versions/pg_versions.rb', line 311 def update_already_known_versions(new_already_known_versions) new_already_known_versions.each { |channel, version| @already_known_versions[channel] = version if (version <=> @already_known_versions[channel]) == 1 } end |
#wait(new_already_known_versions = {}) ⇒ Object
287 288 289 290 291 292 293 294 295 296 297 |
# File 'lib/pg_versions/pg_versions.rb', line 287 def wait(new_already_known_versions = {}) update_already_known_versions(new_already_known_versions) loop { channel, version = @notifications.shift return nil if not channel #termination if (@already_known_versions[channel] <=> version) == -1 @already_known_versions[channel] = version return Notification.new(channel, version) end } end |