Class: RosettaQueue::Gateway::AmqpAdapter
- Inherits:
-
BaseAdapter
- Object
- BaseAdapter
- RosettaQueue::Gateway::AmqpAdapter
- Defined in:
- lib/rosetta_queue/adapters/amqp.rb
Instance Method Summary collapse
- #disconnect ⇒ Object
- #exchange_strategy_for(destination) ⇒ Object
-
#initialize(user, pass, host, port = nil) ⇒ AmqpAdapter
constructor
A new instance of AmqpAdapter.
- #receive_once(destination, opts = {}) ⇒ Object
- #receive_with(message_handler) ⇒ Object
- #send_message(destination, message, options = nil) ⇒ Object
- #unsubscribe ⇒ Object
Constructor Details
#initialize(user, pass, host, port = nil) ⇒ AmqpAdapter
Returns a new instance of AmqpAdapter.
19 20 21 |
# File 'lib/rosetta_queue/adapters/amqp.rb', line 19 def initialize(user, pass, host, port=nil) @user, @pass, @host, @port = user, pass, host, port end |
Instance Method Details
#disconnect ⇒ Object
23 |
# File 'lib/rosetta_queue/adapters/amqp.rb', line 23 def disconnect; end |
#exchange_strategy_for(destination) ⇒ Object
42 43 44 45 46 47 48 49 50 51 |
# File 'lib/rosetta_queue/adapters/amqp.rb', line 42 def exchange_strategy_for(destination) case destination when /(topic|fanout)/ @exchange ||= FanoutExchange.new(@user, @pass, @host) when /queue/ @exchange ||= DirectExchange.new(@user, @pass, @host) else @exchange ||= DirectExchange.new(@user, @pass, @host) end end |
#receive_once(destination, opts = {}) ⇒ Object
25 26 27 28 29 |
# File 'lib/rosetta_queue/adapters/amqp.rb', line 25 def receive_once(destination, opts={}) exchange_strategy_for(destination).do_single_exchange(destination, opts) do |msg| return msg end end |
#receive_with(message_handler) ⇒ Object
31 32 33 34 |
# File 'lib/rosetta_queue/adapters/amqp.rb', line 31 def receive_with() destination = destination_for() exchange_strategy_for(destination).do_exchange(destination, ) end |
#send_message(destination, message, options = nil) ⇒ Object
36 37 38 |
# File 'lib/rosetta_queue/adapters/amqp.rb', line 36 def (destination, , =nil) exchange_strategy_for(destination).publish_to_exchange(destination, , ) end |
#unsubscribe ⇒ Object
40 |
# File 'lib/rosetta_queue/adapters/amqp.rb', line 40 def unsubscribe; end |