Class: Lolitra::RabbitmqBus

Inherits:
Object
  • Object
show all
Defined in:
lib/lolitra/rabbitmq_bus.rb

Constant Summary collapse

SUBSCRIBE_OPTIONS =
{:durable => true}

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(hash = {}) ⇒ RabbitmqBus

Returns a new instance of RabbitmqBus.

[View source]

11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/lolitra/rabbitmq_bus.rb', line 11

def initialize(hash = {})
  Lolitra::MessageHandlerManager.bus = self

  self.options = {
    :queue_prefix => "",
    :queue_suffix => "",
    :exchange_dead_suffix => ".dead",
    :exchange_dead_params => {}, 
    :queue_params => {},
    :queue_dead_suffix => ".dead",
    :queue_dead_params => {},
    :no_consume => false,
  }.merge(hash.delete(:options) || {})

  self.options[:queue_params][:arguments] = {} unless self.options[:queue_params][:arguments]

  self.options[:queue_params][:arguments] = {
    "x-dead-letter-exchange" => "#{hash[:exchange]}#{@options[:exchange_dead_suffix]}"
  }.merge(self.options[:queue_params][:arguments])

  @channels = {}
  @params = hash.reject { |key, value| !value }
  raise "no :exchange specified" unless hash[:exchange]

  AMQP::Utilities::EventLoopHelper.run do
    self.connection = AMQP.start(@params) do |connection|
      Lolitra::logger.info("Connected to rabbitmq.")
      channel = create_channel(connection) do |channel|
        begin
          self.exchange = channel.topic(@params[:exchange], :durable => true)
          self.exchange_dead_letter = channel.topic("#{@params[:exchange]}#{@options[:exchange_dead_suffix]}", :durable => true)

          @params[:subscribers].each do |handler|
            Lolitra::MessageHandlerManager.register_subscriber(handler)
          end
        rescue => e
          Lolitra::log_exception(e)
        end
      end
    end
    self.connection.on_tcp_connection_loss do |connection, settings|
      # reconnect in 10 seconds, without enforcement
      Lolitra::logger.info("Connection loss. Trying to reconnect in 10 secs if needed.")
      connection.reconnect(false, 10)
    end
  end
end

Instance Attribute Details

#connectionObject

Returns the value of attribute connection.


3
4
5
# File 'lib/lolitra/rabbitmq_bus.rb', line 3

def connection
  @connection
end

#exchangeObject

Returns the value of attribute exchange.


4
5
6
# File 'lib/lolitra/rabbitmq_bus.rb', line 4

def exchange
  @exchange
end

#exchange_dead_letterObject

Returns the value of attribute exchange_dead_letter.


5
6
7
# File 'lib/lolitra/rabbitmq_bus.rb', line 5

def exchange_dead_letter
  @exchange_dead_letter
end

#optionsObject

Returns the value of attribute options.


6
7
8
# File 'lib/lolitra/rabbitmq_bus.rb', line 6

def options
  @options
end

#subscribersObject

Returns the value of attribute subscribers.


7
8
9
# File 'lib/lolitra/rabbitmq_bus.rb', line 7

def subscribers
  @subscribers
end

Instance Method Details

#disconnect(&block) ⇒ Object

[View source]

59
60
61
# File 'lib/lolitra/rabbitmq_bus.rb', line 59

def disconnect(&block)
  self.connection.close(&block)
end

#process_deadletters(handler_class) ⇒ Object

[View source]

92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/lolitra/rabbitmq_bus.rb', line 92

def process_deadletters(handler_class)
  queue_name_dead = generate_queue_name_dead(handler_class)
  options = SUBSCRIBE_OPTIONS 
  create_channel(self.connection) do |channel|
    begin
      channel.queue(queue_name_dead, options.merge(@options[:queue_dead_params])) do |queue|
        recursive_pop(channel, queue, handler_class)
      end
    rescue => e
      Lolitra::log_exception(e)
    end
  end
  true
end

#publish(message) ⇒ Object

[View source]

67
68
69
70
# File 'lib/lolitra/rabbitmq_bus.rb', line 67

def publish(message)
  #TODO: if exchange channel is closed doesn't log anything
  self.exchange.publish(message.marshall, :routing_key => message.class.message_key, :timestamp => Time.now.to_i)
end

#purge_deadletters(handler_class) ⇒ Object

[View source]

107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/lolitra/rabbitmq_bus.rb', line 107

def purge_deadletters(handler_class)
  queue_name_dead = generate_queue_name_dead(handler_class)
  options = SUBSCRIBE_OPTIONS 
  create_channel(self.connection) do |channel|
    begin
      channel.queue(queue_name_dead, options.merge(@options[:queue_dead_params])) do |queue|
        purge_queue(queue)
      end
    rescue => e
      Lolitra::log_exception(e)
    end
  end
  true
end

#remove_next_deadletter(handler_class) ⇒ Object

[View source]

122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/lolitra/rabbitmq_bus.rb', line 122

def remove_next_deadletter(handler_class)
  queue_name_dead = generate_queue_name_dead(handler_class)
  options = SUBSCRIBE_OPTIONS 
  create_channel(self.connection) do |channel|
    begin
      channel.queue(queue_name_dead, options.merge(@options[:queue_dead_params])) do |queue|
        queue.pop
      end
    rescue => e
      Lolitra::log_exception(e)
    end
  end
  true
end

#subscribe(message_class, handler_class) ⇒ Object

[View source]

63
64
65
# File 'lib/lolitra/rabbitmq_bus.rb', line 63

def subscribe(message_class, handler_class)
  create_queue(message_class, handler_class, SUBSCRIBE_OPTIONS)
end

#unsubscribe(handler_class, &block) ⇒ Object

[View source]

72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/lolitra/rabbitmq_bus.rb', line 72

def unsubscribe(handler_class, &block)
  queue_name = generate_queue_name(handler_class)
  begin
    create_channel(self.connection) do |channel|
      queue = channel.queue(queue_name, SUBSCRIBE_OPTIONS) do |queue|
        begin
          queue.delete do
            block.call(handler_class, true)
          end
        rescue => e
          Lolitra::log_exception(e)
          block.call(handler_class, false)
        end
      end
    end
  rescue => e
    Lolitra::log_exception(e)
  end
end