Class: PgVersions::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/pg_versions/pg_versions.rb

Defined Under Namespace

Classes: Subscription

Instance Method Summary collapse

Constructor Details

#initialize(connection = nil) ⇒ Connection

Returns a new instance of Connection.



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/pg_versions/pg_versions.rb', line 147

def initialize(connection=nil)
	@actor_commands = Queue.new
	actor_notify_r, @actor_notify_w = IO.pipe

	connection_error_queue = Queue.new
	@actor = Thread.new {
		begin
			PgVersions.with_connection(connection) { |pg_connection|
				connection_error_queue << false
				subscribers = Hash.new { |h,k| h[k] = Set.new }
				loop {
					#TODO: handle errors
					reads,_writes,_errors = IO::select([pg_connection.socket_io, actor_notify_r])
					
					if reads.include?(pg_connection.socket_io)
						pg_connection.consume_input
					end

					if reads.include?(actor_notify_r)
						@actor_commands.shift.call(pg_connection, subscribers)
						actor_notify_r.read(1)
					end

					while notification = pg_connection.notifies
						channel, payload = notification[:relname], notification[:extra]
						subscribers[channel].each { |subscriber|
							subscriber.notify(channel, PgVersions.string_to_version(payload))
						}
					end
				}
			}
		rescue ConnectionAcquisitionFailedError => e
			connection_error_queue << e
		end
	}
	(connection_error = connection_error_queue.shift) and raise connection_error
end

Instance Method Details

#actor_call(&block) ⇒ Object



186
187
188
189
190
191
192
193
# File 'lib/pg_versions/pg_versions.rb', line 186

def actor_call(&block)
	done = Queue.new
	@actor_commands << proc { |pg_connection, subscribers|
		done << block.call(pg_connection, subscribers)
	} 
	@actor_notify_w.write('!')
	done.shift
end

#bump(*channels) ⇒ Object



196
197
198
199
200
# File 'lib/pg_versions/pg_versions.rb', line 196

def bump(*channels)
	actor_call { |pg_connection, _subscribers|
		PgVersions.bump(channels, connection: pg_connection)
	}
end

#read(*channels) ⇒ Object



203
204
205
206
207
# File 'lib/pg_versions/pg_versions.rb', line 203

def read(*channels)
	actor_call { |pg_connection, _subscribers|
		PgVersions.read(channels, connection: pg_connection)
	}
end

#subscribe(*channels, known: {}) ⇒ Object



210
211
212
213
214
# File 'lib/pg_versions/pg_versions.rb', line 210

def subscribe(*channels, known: {})
	subscription = Subscription.new(self)
	subscription.subscribe([channels].flatten, known: known)
	subscription
end