Class: RosettaQueue::Gateway::FanoutExchange

Inherits:
BaseExchange show all
Defined in:
lib/rosetta_queue/adapters/amqp.rb

Instance Method Summary collapse

Methods inherited from BaseExchange

#initialize, #publish_to_exchange

Constructor Details

This class inherits a constructor from RosettaQueue::Gateway::BaseExchange

Instance Method Details

#do_exchange(destination, message_handler) ⇒ Object



113
114
115
116
117
118
119
120
121
122
# File 'lib/rosetta_queue/adapters/amqp.rb', line 113

def do_exchange(destination, message_handler)
  queue     = channel.queue("queue_#{self.object_id}")
  exchange  = channel.fanout(fanout_name_for(destination))

  queue.bind(exchange).subscribe do |msg|
  # channel.queue("queue_#{rand}").bind(channel.fanout(fanout_name_for(destination))).subscribe do |msg|
    RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
    message_handler.on_message(Filters.process_receiving(msg))
  end        
end

#do_single_exchange(destination, opts = {}) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/rosetta_queue/adapters/amqp.rb', line 124

def do_single_exchange(destination, opts={})
  EM.run do
    queue     = channel.queue("queue_#{self.object_id}")
    exchange  = channel.fanout(fanout_name_for(destination))

    queue.bind(exchange).pop do |msg|
    # channel.queue("queue_#{rand}").bind(channel.fanout(fanout_name_for(destination)), opts).pop do |msg|
      RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
      yield Filters.process_receiving(msg)
    end
  end
end