Class: Lapine::Configuration

Inherits:
Object
  • Object
show all
Defined in:
lib/lapine/configuration.rb

Instance Method Summary collapse

Constructor Details

#initializeConfiguration

Returns a new instance of Configuration.



3
4
5
# File 'lib/lapine/configuration.rb', line 3

def initialize
  @active_connections = {}
end

Instance Method Details

#active_connection(name) ⇒ Object



43
44
45
46
47
48
49
50
51
52
# File 'lib/lapine/configuration.rb', line 43

def active_connection(name)
  conn = @active_connections[name]
  return conn if (conn && conn.connected?)

  @active_connections[name] = begin
    @conn = Bunny.new(connection_props_for(name)).tap do |conn|
      conn.start
    end
  end
end

#channels_by_exchange_idObject



15
16
17
# File 'lib/lapine/configuration.rb', line 15

def channels_by_exchange_id
  @channels_by_exchange_id ||= {}
end

#cleanup_exchange(id) ⇒ Object



23
24
25
26
27
28
29
# File 'lib/lapine/configuration.rb', line 23

def cleanup_exchange(id)
  return unless channels_by_exchange_id[id]
  channel = channels_by_exchange_id[id]
  channel.connection.logger.info "Closing channel for exchange #{id}, thread: #{Thread.current.object_id}"
  channel.close
  channels_by_exchange_id[id] = nil
end

#close_connections!Object



54
55
56
57
58
# File 'lib/lapine/configuration.rb', line 54

def close_connections!
  @active_connections.values.map(&:close)
  @active_connections = {}
  Thread.current.thread_variable_set(:lapine_exchanges, nil)
end

#connection_propertiesObject



11
12
13
# File 'lib/lapine/configuration.rb', line 11

def connection_properties
  @connection_properties ||= {}
end

#connection_props_for(name) ⇒ Object



60
61
62
63
64
65
66
67
# File 'lib/lapine/configuration.rb', line 60

def connection_props_for(name)
  return unless connection_properties[name]
  connection_properties[name].dup.tap do |props|
    if defined?(Rails)
      props.merge!(logger: Rails.logger)
    end
  end
end

#connectionsObject



7
8
9
# File 'lib/lapine/configuration.rb', line 7

def connections
  @connections ||= {}
end

#exchange_propertiesObject



39
40
41
# File 'lib/lapine/configuration.rb', line 39

def exchange_properties
  @exchange_properties ||= {}
end

#exchangesObject

Exchanges need to be saved in a thread-local variable, rather than a fiber-local variable, because in the context of some applications (such as Sidekiq, which uses Celluloid) individual bits of work are done in fibers that are immediately reaped.



34
35
36
37
# File 'lib/lapine/configuration.rb', line 34

def exchanges
  Thread.current.thread_variable_get(:lapine_exchanges) ||
    Thread.current.thread_variable_set(:lapine_exchanges, {})
end

#register_channel(object_id, channel) ⇒ Object



19
20
21
# File 'lib/lapine/configuration.rb', line 19

def register_channel(object_id, channel)
  channels_by_exchange_id[object_id] = channel
end