Method: Lolitra::RabbitmqBus#initialize

Defined in:
lib/lolitra/rabbitmq_bus.rb

#initialize(hash = {}) ⇒ RabbitmqBus

Returns a new instance of RabbitmqBus.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/lolitra/rabbitmq_bus.rb', line 11

def initialize(hash = {})
  Lolitra::MessageHandlerManager.bus = self

  self.options = {
    :queue_prefix => "",
    :queue_suffix => "",
    :exchange_dead_suffix => ".dead",
    :exchange_dead_params => {}, 
    :queue_params => {},
    :queue_dead_suffix => ".dead",
    :queue_dead_params => {},
    :no_consume => false,
  }.merge(hash.delete(:options) || {})

  self.options[:queue_params][:arguments] = {} unless self.options[:queue_params][:arguments]

  self.options[:queue_params][:arguments] = {
    "x-dead-letter-exchange" => "#{hash[:exchange]}#{@options[:exchange_dead_suffix]}"
  }.merge(self.options[:queue_params][:arguments])

  @channels = {}
  @params = hash.reject { |key, value| !value }
  raise "no :exchange specified" unless hash[:exchange]

  AMQP::Utilities::EventLoopHelper.run do
    self.connection = AMQP.start(@params) do |connection|
      Lolitra::logger.info("Connected to rabbitmq.")
      channel = create_channel(connection) do |channel|
        begin
          self.exchange = channel.topic(@params[:exchange], :durable => true)
          self.exchange_dead_letter = channel.topic("#{@params[:exchange]}#{@options[:exchange_dead_suffix]}", :durable => true)

          @params[:subscribers].each do |handler|
            Lolitra::MessageHandlerManager.register_subscriber(handler)
          end
        rescue => e
          Lolitra::log_exception(e)
        end
      end
    end
    self.connection.on_tcp_connection_loss do |connection, settings|
      # reconnect in 10 seconds, without enforcement
      Lolitra::logger.info("Connection loss. Trying to reconnect in 10 secs if needed.")
      connection.reconnect(false, 10)
    end
  end
end