Class: PgVersions::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/pg_versions/pg_versions.rb

Defined Under Namespace

Classes: Subscription

Instance Method Summary collapse

Constructor Details

#initializeConnection

Returns a new instance of Connection.



381
382
383
# File 'lib/pg_versions/pg_versions.rb', line 381

def initialize()
	@inner = ConnectionInner.new
end

Instance Method Details

#bump(*channels) ⇒ Object



461
462
463
# File 'lib/pg_versions/pg_versions.rb', line 461

def bump(*channels)
	@inner.bump(channels)
end

#closeObject



456
457
458
# File 'lib/pg_versions/pg_versions.rb', line 456

def close
	@inner.close
end

#process(connection_param = nil, autoreconnect = true, &block) ⇒ Object



386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
# File 'lib/pg_versions/pg_versions.rb', line 386

def process(connection_param=nil, autoreconnect=true, &block)
	raise "Both 'connection_param' and a block were given. Don't know which to use."  if !connection_param.nil? and !block.nil?
	connection_param ||= block

	retry_on_exceptions = [ ::PG::ConnectionBad, ::PG::UnableToSend ]
	retry_delay = 0

	@inner.process do |notification_r|
		raise if not notification_r
		PgVersions.with_connection(connection_param, true) do |pg_connection|

			listening_to_channels = @inner.get_channels
			listening_to_channels.each { |channel|
				pg_connection.exec("LISTEN #{::PG::Connection.quote_ident(channel)}")
			}
			PgVersions.read(pg_connection, listening_to_channels).each { |channel, version|
				@inner.notify(channel, version)
			}

			loop {
				channels_to_listen_to = @inner.get_channels
				(listening_to_channels - channels_to_listen_to).each { |removed_channel|
					pg_connection.exec("UNLISTEN #{::PG::Connection.quote_ident(removed_channel)}")
				}
				(channels_to_listen_to - listening_to_channels).each { |added_channel|
					pg_connection.exec("LISTEN #{::PG::Connection.quote_ident(added_channel)}")
				}
				listening_to_channels = channels_to_listen_to

				@inner.taking_bumps { |bumps|
					channels_to_bump = bumps.map(&:last).flatten.uniq
					bumped_versions = PgVersions.bump(pg_connection, channels_to_bump)
					bumps.each { |bumper, channels|
						bumper.push bumped_versions.slice(*channels)  if not bumper.nil?
					}
				}

				@inner.taking_reads { |reads|
					channels_to_read = reads.map(&:last).uniq
					read_versions = PgVersions.read(pg_connection, channels_to_read)
					reads.each { |reader, channels|
						reader.push read_versions.slice(*channels)
					}
				}

				break if @inner.is_closing

				while notification = pg_connection.notifies
					channel, payload = notification[:relname], notification[:extra]
					@inner.notify(channel, PgVersions.string_to_version(payload))
				end

				#TODO: handle errors
				reads,_writes,_errors = IO::select([pg_connection.socket_io, notification_r])
				pg_connection.consume_input  if reads.include?(pg_connection.socket_io)
				notification_r.read(1)  if reads.include?(notification_r)  #TODO: read everything that can be read here

			}
		end
	rescue *retry_on_exceptions => error
		raise  if connection_param.kind_of?(::PG::Connection)  or  !autoreconnect
		return if @inner.is_closing
		$stderr.puts "Pg connection failed (retrying in #{retry_delay/1000.0}s):\n\t#{error.message}"
		sleep retry_delay/1000.0
		retry_delay = { 0=>100, 100=>1000, 1000=>2000, 2000=>2000 }[retry_delay]
		retry
	end
end

#read(*channels) ⇒ Object



466
467
468
# File 'lib/pg_versions/pg_versions.rb', line 466

def read(*channels)
	@inner.read(channels)
end

#subscribe(*channels, known: {}, batch_delay: 0.01) ⇒ Object



471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
# File 'lib/pg_versions/pg_versions.rb', line 471

def subscribe(*channels, known: {}, batch_delay: 0.01)
	subscription = Subscription.new(@inner, batch_delay)
	subscription.subscribe([channels].flatten, known: known)
	if block_given?
		Thread.handle_interrupt(Object => :never) {
			begin
				Thread.handle_interrupt(Object => :immediate) {
					yield subscription
				}
			ensure
				subscription.drop
			end
		}
	else
		subscription
	end
end