Class: Lolitra::RabbitmqBus
- Inherits:
-
Object
- Object
- Lolitra::RabbitmqBus
- Defined in:
- lib/lolitra/rabbitmq_bus.rb
Constant Summary collapse
- SUBSCRIBE_OPTIONS =
{:durable => true}
Instance Attribute Summary collapse
-
#connection ⇒ Object
Returns the value of attribute connection.
-
#exchange ⇒ Object
Returns the value of attribute exchange.
-
#exchange_dead_letter ⇒ Object
Returns the value of attribute exchange_dead_letter.
-
#options ⇒ Object
Returns the value of attribute options.
-
#subscribers ⇒ Object
Returns the value of attribute subscribers.
Instance Method Summary collapse
- #disconnect(&block) ⇒ Object
-
#initialize(hash = {}) ⇒ RabbitmqBus
constructor
A new instance of RabbitmqBus.
- #process_deadletters(handler_class) ⇒ Object
- #publish(message) ⇒ Object
- #purge_deadletters(handler_class) ⇒ Object
- #remove_next_deadletter(handler_class) ⇒ Object
- #subscribe(message_class, handler_class) ⇒ Object
- #unsubscribe(handler_class, &block) ⇒ Object
Constructor Details
permalink #initialize(hash = {}) ⇒ RabbitmqBus
Returns a new instance of RabbitmqBus.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/lolitra/rabbitmq_bus.rb', line 11 def initialize(hash = {}) Lolitra::MessageHandlerManager.bus = self self. = { :queue_prefix => "", :queue_suffix => "", :exchange_dead_suffix => ".dead", :exchange_dead_params => {}, :queue_params => {}, :queue_dead_suffix => ".dead", :queue_dead_params => {}, :no_consume => false, }.merge(hash.delete(:options) || {}) self.[:queue_params][:arguments] = {} unless self.[:queue_params][:arguments] self.[:queue_params][:arguments] = { "x-dead-letter-exchange" => "#{hash[:exchange]}#{@options[:exchange_dead_suffix]}" }.merge(self.[:queue_params][:arguments]) @channels = {} @params = hash.reject { |key, value| !value } raise "no :exchange specified" unless hash[:exchange] AMQP::Utilities::EventLoopHelper.run do self.connection = AMQP.start(@params) do |connection| Lolitra::logger.info("Connected to rabbitmq.") channel = create_channel(connection) do |channel| begin self.exchange = channel.topic(@params[:exchange], :durable => true) self.exchange_dead_letter = channel.topic("#{@params[:exchange]}#{@options[:exchange_dead_suffix]}", :durable => true) @params[:subscribers].each do |handler| Lolitra::MessageHandlerManager.register_subscriber(handler) end rescue => e Lolitra::log_exception(e) end end end self.connection.on_tcp_connection_loss do |connection, settings| # reconnect in 10 seconds, without enforcement Lolitra::logger.info("Connection loss. Trying to reconnect in 10 secs if needed.") connection.reconnect(false, 10) end end end |
Instance Attribute Details
permalink #connection ⇒ Object
Returns the value of attribute connection.
3 4 5 |
# File 'lib/lolitra/rabbitmq_bus.rb', line 3 def connection @connection end |
permalink #exchange ⇒ Object
Returns the value of attribute exchange.
4 5 6 |
# File 'lib/lolitra/rabbitmq_bus.rb', line 4 def exchange @exchange end |
permalink #exchange_dead_letter ⇒ Object
Returns the value of attribute exchange_dead_letter.
5 6 7 |
# File 'lib/lolitra/rabbitmq_bus.rb', line 5 def exchange_dead_letter @exchange_dead_letter end |
permalink #options ⇒ Object
Returns the value of attribute options.
6 7 8 |
# File 'lib/lolitra/rabbitmq_bus.rb', line 6 def @options end |
permalink #subscribers ⇒ Object
Returns the value of attribute subscribers.
7 8 9 |
# File 'lib/lolitra/rabbitmq_bus.rb', line 7 def subscribers @subscribers end |
Instance Method Details
permalink #disconnect(&block) ⇒ Object
[View source]
59 60 61 |
# File 'lib/lolitra/rabbitmq_bus.rb', line 59 def disconnect(&block) self.connection.close(&block) end |
permalink #process_deadletters(handler_class) ⇒ Object
[View source]
92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/lolitra/rabbitmq_bus.rb', line 92 def process_deadletters(handler_class) queue_name_dead = generate_queue_name_dead(handler_class) = SUBSCRIBE_OPTIONS create_channel(self.connection) do |channel| begin channel.queue(queue_name_dead, .merge(@options[:queue_dead_params])) do |queue| recursive_pop(channel, queue, handler_class) end rescue => e Lolitra::log_exception(e) end end true end |
permalink #publish(message) ⇒ Object
[View source]
67 68 69 70 |
# File 'lib/lolitra/rabbitmq_bus.rb', line 67 def publish() #TODO: if exchange channel is closed doesn't log anything self.exchange.publish(.marshall, :routing_key => .class., :timestamp => Time.now.to_i) end |
permalink #purge_deadletters(handler_class) ⇒ Object
[View source]
107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/lolitra/rabbitmq_bus.rb', line 107 def purge_deadletters(handler_class) queue_name_dead = generate_queue_name_dead(handler_class) = SUBSCRIBE_OPTIONS create_channel(self.connection) do |channel| begin channel.queue(queue_name_dead, .merge(@options[:queue_dead_params])) do |queue| purge_queue(queue) end rescue => e Lolitra::log_exception(e) end end true end |
permalink #remove_next_deadletter(handler_class) ⇒ Object
[View source]
122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/lolitra/rabbitmq_bus.rb', line 122 def remove_next_deadletter(handler_class) queue_name_dead = generate_queue_name_dead(handler_class) = SUBSCRIBE_OPTIONS create_channel(self.connection) do |channel| begin channel.queue(queue_name_dead, .merge(@options[:queue_dead_params])) do |queue| queue.pop end rescue => e Lolitra::log_exception(e) end end true end |
permalink #subscribe(message_class, handler_class) ⇒ Object
[View source]
63 64 65 |
# File 'lib/lolitra/rabbitmq_bus.rb', line 63 def subscribe(, handler_class) create_queue(, handler_class, SUBSCRIBE_OPTIONS) end |
permalink #unsubscribe(handler_class, &block) ⇒ Object
[View source]
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/lolitra/rabbitmq_bus.rb', line 72 def unsubscribe(handler_class, &block) queue_name = generate_queue_name(handler_class) begin create_channel(self.connection) do |channel| queue = channel.queue(queue_name, SUBSCRIBE_OPTIONS) do |queue| begin queue.delete do block.call(handler_class, true) end rescue => e Lolitra::log_exception(e) block.call(handler_class, false) end end end rescue => e Lolitra::log_exception(e) end end |