Class: RosettaQueue::Gateway::FanoutExchange
- Inherits:
-
BaseExchange
- Object
- BaseExchange
- RosettaQueue::Gateway::FanoutExchange
- Defined in:
- lib/rosetta_queue/adapters/amqp.rb
Instance Method Summary collapse
- #do_exchange(destination, message_handler) ⇒ Object
- #do_single_exchange(destination, opts = {}) ⇒ Object
Methods inherited from BaseExchange
#initialize, #publish_to_exchange
Constructor Details
This class inherits a constructor from RosettaQueue::Gateway::BaseExchange
Instance Method Details
#do_exchange(destination, message_handler) ⇒ Object
113 114 115 116 117 118 119 120 121 122 |
# File 'lib/rosetta_queue/adapters/amqp.rb', line 113 def do_exchange(destination, ) queue = channel.queue("queue_#{self.object_id}") exchange = channel.fanout(fanout_name_for(destination)) queue.bind(exchange).subscribe do |msg| # channel.queue("queue_#{rand}").bind(channel.fanout(fanout_name_for(destination))).subscribe do |msg| RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}") .(Filters.process_receiving(msg)) end end |
#do_single_exchange(destination, opts = {}) ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/rosetta_queue/adapters/amqp.rb', line 124 def do_single_exchange(destination, opts={}) EM.run do queue = channel.queue("queue_#{self.object_id}") exchange = channel.fanout(fanout_name_for(destination)) queue.bind(exchange).pop do |msg| # channel.queue("queue_#{rand}").bind(channel.fanout(fanout_name_for(destination)), opts).pop do |msg| RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}") yield Filters.process_receiving(msg) end end end |