Class: PgVersions::Connection::Subscription

Inherits:
Object
  • Object
show all
Defined in:
lib/pg_versions/pg_versions.rb

Instance Method Summary collapse

Constructor Details

#initialize(connection) ⇒ Subscription

Returns a new instance of Subscription.



219
220
221
222
223
224
# File 'lib/pg_versions/pg_versions.rb', line 219

def initialize(connection)
	@connection = connection
	@notifications = Queue.new
	@already_known_versions = Hash.new { |h,k| h[k] = [] }
	@channels = Hash.new(0)
end

Instance Method Details

#bump(*channels, notify: true) ⇒ Object



277
278
279
280
281
282
283
284
# File 'lib/pg_versions/pg_versions.rb', line 277

def bump(*channels, notify: true)
	channels = @channels.keys  if channels.size == 0
	versions = @connection.actor_call { |pg_connection, subscribers|
		PgVersions.bump(channels, connection: pg_connection)
	}
	update_already_known_versions(versions)  if not notify
	versions
end

#dropObject



305
306
307
308
# File 'lib/pg_versions/pg_versions.rb', line 305

def drop
	@notifications << [nil, nil]
	unsubscribe(@channels.keys)
end

#notify(channel, payload) ⇒ Object



300
301
302
# File 'lib/pg_versions/pg_versions.rb', line 300

def notify(channel, payload)
	@notifications << [channel, payload]
end

#read(*channels, notify: true) ⇒ Object



267
268
269
270
271
272
273
274
# File 'lib/pg_versions/pg_versions.rb', line 267

def read(*channels, notify: true)
	channels = @channels.keys  if channels.size == 0
	versions = @connection.actor_call { |pg_connection, subscribers|
		PgVersions.read(channels, connection: pg_connection)
	}
	update_already_known_versions(versions)  if not notify
	versions
end

#subscribe(channels, known: {}) ⇒ Object



227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/pg_versions/pg_versions.rb', line 227

def subscribe(channels, known: {})
	update_already_known_versions(known)
	channels = [channels].flatten
	channels.select! { |channel|
		(@channels[channel] += 1) == 1
	}
	if channels.size > 0
		@connection.actor_call { |pg_connection, subscribers|
			channels.each { |channel|
				subscribers[channel] << self
				pg_connection.exec("LISTEN #{PG::Connection.quote_ident(channel)}")  if subscribers[channel].size == 1
			}
			PgVersions.read(channels, connection: pg_connection).each_pair { |channel, version|
				notify(channel, version)
			}
		}
	end
end

#unsubscribe(*channels) ⇒ Object



247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/pg_versions/pg_versions.rb', line 247

def unsubscribe(*channels)
	channels = [channels].flatten
	channels.select! { |channel|
		@channels[channel] -= 1
		raise "Trying to unsubscribe from channel (%p) more times than it was subscribed to"%[channel]  if @channels[channel] < 0
		@channels.delete(channel)  if @channels[channel] == 0
		not @channels.has_key?(channel)
	}
	@connection.actor_call { |pg_connection, subscribers|
		channels.each { |channel|
			subscribers[channel].delete(self)
			if subscribers[channel].size == 0
				pg_connection.exec("UNLISTEN #{PG::Connection.quote_ident(channel)}")
				subscribers.delete(channel)
			end
		}
	}
end

#update_already_known_versions(new_already_known_versions) ⇒ Object



311
312
313
314
315
# File 'lib/pg_versions/pg_versions.rb', line 311

def update_already_known_versions(new_already_known_versions)
	new_already_known_versions.each { |channel, version|
		@already_known_versions[channel] = version  if (version <=> @already_known_versions[channel]) == 1
	}
end

#wait(new_already_known_versions = {}) ⇒ Object



287
288
289
290
291
292
293
294
295
296
297
# File 'lib/pg_versions/pg_versions.rb', line 287

def wait(new_already_known_versions = {})
	update_already_known_versions(new_already_known_versions)
	loop {
		channel, version = @notifications.shift
		return nil  if not channel #termination
		if (@already_known_versions[channel] <=> version) == -1
			@already_known_versions[channel] = version
			return Notification.new(channel, version)
		end
	}
end