Class: Lapine::Configuration
- Inherits:
-
Object
- Object
- Lapine::Configuration
- Defined in:
- lib/lapine/configuration.rb
Instance Method Summary collapse
- #active_connection(name) ⇒ Object
- #channels_by_exchange_id ⇒ Object
- #cleanup_exchange(id) ⇒ Object
- #close_connections! ⇒ Object
- #connection_properties ⇒ Object
- #connection_props_for(name) ⇒ Object
- #connections ⇒ Object
- #exchange_properties ⇒ Object
-
#exchanges ⇒ Object
Exchanges need to be saved in a thread-local variable, rather than a fiber-local variable, because in the context of some applications (such as Sidekiq, which uses Celluloid) individual bits of work are done in fibers that are immediately reaped.
-
#initialize ⇒ Configuration
constructor
A new instance of Configuration.
- #register_channel(object_id, channel) ⇒ Object
Constructor Details
#initialize ⇒ Configuration
Returns a new instance of Configuration.
3 4 5 |
# File 'lib/lapine/configuration.rb', line 3 def initialize @active_connections = {} end |
Instance Method Details
#active_connection(name) ⇒ Object
43 44 45 46 47 48 49 50 51 52 |
# File 'lib/lapine/configuration.rb', line 43 def active_connection(name) conn = @active_connections[name] return conn if (conn && conn.connected?) @active_connections[name] = begin @conn = Bunny.new(connection_props_for(name)).tap do |conn| conn.start end end end |
#channels_by_exchange_id ⇒ Object
15 16 17 |
# File 'lib/lapine/configuration.rb', line 15 def channels_by_exchange_id @channels_by_exchange_id ||= {} end |
#cleanup_exchange(id) ⇒ Object
23 24 25 26 27 28 29 |
# File 'lib/lapine/configuration.rb', line 23 def cleanup_exchange(id) return unless channels_by_exchange_id[id] channel = channels_by_exchange_id[id] channel.connection.logger.info "Closing channel for exchange #{id}, thread: #{Thread.current.object_id}" channel.close channels_by_exchange_id[id] = nil end |
#close_connections! ⇒ Object
54 55 56 57 58 |
# File 'lib/lapine/configuration.rb', line 54 def close_connections! @active_connections.values.map(&:close) @active_connections = {} Thread.current.thread_variable_set(:lapine_exchanges, nil) end |
#connection_properties ⇒ Object
11 12 13 |
# File 'lib/lapine/configuration.rb', line 11 def connection_properties @connection_properties ||= {} end |
#connection_props_for(name) ⇒ Object
60 61 62 63 64 65 66 67 |
# File 'lib/lapine/configuration.rb', line 60 def connection_props_for(name) return unless connection_properties[name] connection_properties[name].dup.tap do |props| if defined?(Rails) props.merge!(logger: Rails.logger) end end end |
#connections ⇒ Object
7 8 9 |
# File 'lib/lapine/configuration.rb', line 7 def connections @connections ||= {} end |
#exchange_properties ⇒ Object
39 40 41 |
# File 'lib/lapine/configuration.rb', line 39 def exchange_properties @exchange_properties ||= {} end |
#exchanges ⇒ Object
Exchanges need to be saved in a thread-local variable, rather than a fiber-local variable, because in the context of some applications (such as Sidekiq, which uses Celluloid) individual bits of work are done in fibers that are immediately reaped.
34 35 36 37 |
# File 'lib/lapine/configuration.rb', line 34 def exchanges Thread.current.thread_variable_get(:lapine_exchanges) || Thread.current.thread_variable_set(:lapine_exchanges, {}) end |
#register_channel(object_id, channel) ⇒ Object
19 20 21 |
# File 'lib/lapine/configuration.rb', line 19 def register_channel(object_id, channel) channels_by_exchange_id[object_id] = channel end |