Method: Lolitra::RabbitmqBus#initialize
- Defined in:
- lib/lolitra/rabbitmq_bus.rb
#initialize(hash = {}) ⇒ RabbitmqBus
Returns a new instance of RabbitmqBus.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/lolitra/rabbitmq_bus.rb', line 11 def initialize(hash = {}) Lolitra::MessageHandlerManager.bus = self self. = { :queue_prefix => "", :queue_suffix => "", :exchange_dead_suffix => ".dead", :exchange_dead_params => {}, :queue_params => {}, :queue_dead_suffix => ".dead", :queue_dead_params => {}, :no_consume => false, }.merge(hash.delete(:options) || {}) self.[:queue_params][:arguments] = {} unless self.[:queue_params][:arguments] self.[:queue_params][:arguments] = { "x-dead-letter-exchange" => "#{hash[:exchange]}#{@options[:exchange_dead_suffix]}" }.merge(self.[:queue_params][:arguments]) @channels = {} @params = hash.reject { |key, value| !value } raise "no :exchange specified" unless hash[:exchange] AMQP::Utilities::EventLoopHelper.run do self.connection = AMQP.start(@params) do |connection| Lolitra::logger.info("Connected to rabbitmq.") channel = create_channel(connection) do |channel| begin self.exchange = channel.topic(@params[:exchange], :durable => true) self.exchange_dead_letter = channel.topic("#{@params[:exchange]}#{@options[:exchange_dead_suffix]}", :durable => true) @params[:subscribers].each do |handler| Lolitra::MessageHandlerManager.register_subscriber(handler) end rescue => e Lolitra::log_exception(e) end end end self.connection.on_tcp_connection_loss do |connection, settings| # reconnect in 10 seconds, without enforcement Lolitra::logger.info("Connection loss. Trying to reconnect in 10 secs if needed.") connection.reconnect(false, 10) end end end |