Class: PgVersions::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/pg_versions/pg_versions.rb

Defined Under Namespace

Classes: Subscription

Instance Method Summary collapse

Constructor Details

#initializeConnection

Returns a new instance of Connection.



379
380
381
# File 'lib/pg_versions/pg_versions.rb', line 379

def initialize()
	@inner = ConnectionInner.new
end

Instance Method Details

#bump(*channels) ⇒ Object



459
460
461
# File 'lib/pg_versions/pg_versions.rb', line 459

def bump(*channels)
	@inner.bump(channels)
end

#closeObject



454
455
456
# File 'lib/pg_versions/pg_versions.rb', line 454

def close
	@inner.close
end

#process(connection_param = nil, autoreconnect = true, &block) ⇒ Object



384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
# File 'lib/pg_versions/pg_versions.rb', line 384

def process(connection_param=nil, autoreconnect=true, &block)
	raise "Both 'connection_param' and a block were given. Don't know which to use."  if !connection_param.nil? and !block.nil?
	connection_param ||= block

	retry_on_exceptions = [ ::PG::ConnectionBad, ::PG::UnableToSend ]
	retry_delay = 0
	
	@inner.process do |notification_r|
		raise if not notification_r
		PgVersions.with_connection(connection_param, true) do |pg_connection|

			listening_to_channels = @inner.get_channels
			listening_to_channels.each { |channel|
				pg_connection.exec("LISTEN #{::PG::Connection.quote_ident(channel)}")
			}
			PgVersions.read(pg_connection, listening_to_channels).each { |channel, version|
				@inner.notify(channel, version)
			}

			loop {
				channels_to_listen_to = @inner.get_channels
				(listening_to_channels - channels_to_listen_to).each { |removed_channel| 
					pg_connection.exec("UNLISTEN #{::PG::Connection.quote_ident(removed_channel)}")
				}
				(channels_to_listen_to - listening_to_channels).each { |added_channel| 
					pg_connection.exec("LISTEN #{::PG::Connection.quote_ident(added_channel)}")
				}
				listening_to_channels = channels_to_listen_to

				@inner.taking_bumps { |bumps|
					channels_to_bump = bumps.map(&:last).flatten.uniq
					bumped_versions = PgVersions.bump(pg_connection, channels_to_bump)
					bumps.each { |bumper, channels|
						bumper.push bumped_versions.slice(*channels)  if not bumper.nil?
					}
				}

				@inner.taking_reads { |reads|
					channels_to_read = reads.map(&:last).uniq
					read_versions = PgVersions.read(pg_connection, channels_to_read)
					reads.each { |reader, channels|
						reader.push read_versions.slice(*channels)
					}
				}

				break if @inner.is_closing

				while notification = pg_connection.notifies
					channel, payload = notification[:relname], notification[:extra]
					@inner.notify(channel, PgVersions.string_to_version(payload))
				end

				#TODO: handle errors
				reads,_writes,_errors = IO::select([pg_connection.socket_io, notification_r])
				pg_connection.consume_input  if reads.include?(pg_connection.socket_io)
				notification_r.read(1)  if reads.include?(notification_r)  #TODO: read everything that can be read here

			}
		end
	rescue *retry_on_exceptions => error
		raise  if connection_param.kind_of?(::PG::Connection)  or  !autoreconnect
		return if @inner.is_closing
		$stderr.puts "Pg connection failed (retrying in #{retry_delay/1000.0}s):\n\t#{error.message}"
		sleep retry_delay/1000.0
		retry_delay = { 0=>100, 100=>1000, 1000=>2000, 2000=>2000 }[retry_delay]
		retry
	end
end

#read(*channels) ⇒ Object



464
465
466
# File 'lib/pg_versions/pg_versions.rb', line 464

def read(*channels)
	@inner.read(channels)
end

#subscribe(*channels, known: {}, batch_delay: 0.01) ⇒ Object



469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
# File 'lib/pg_versions/pg_versions.rb', line 469

def subscribe(*channels, known: {}, batch_delay: 0.01)
	subscription = Subscription.new(@inner, batch_delay)
	subscription.subscribe([channels].flatten, known: known)
	if block_given?
		Thread.handle_interrupt(Object => :never) {
			begin
				Thread.handle_interrupt(Object => :immediate) {
					yield subscription
				}
			ensure
				subscription.drop
			end
		}
	else
		subscription
	end
end