Class: RosettaQueue::Gateway::AmqpAdapter

Inherits:
BaseAdapter
  • Object
show all
Defined in:
lib/rosetta_queue/adapters/amqp.rb

Instance Method Summary collapse

Constructor Details

#initialize(user, pass, host, port = nil) ⇒ AmqpAdapter

Returns a new instance of AmqpAdapter.



19
20
21
# File 'lib/rosetta_queue/adapters/amqp.rb', line 19

def initialize(user, pass, host, port=nil)
  @user, @pass, @host, @port = user, pass, host, port
end

Instance Method Details

#disconnectObject



23
# File 'lib/rosetta_queue/adapters/amqp.rb', line 23

def disconnect; end

#exchange_strategy_for(destination) ⇒ Object



42
43
44
45
46
47
48
49
50
51
# File 'lib/rosetta_queue/adapters/amqp.rb', line 42

def exchange_strategy_for(destination)
  case destination
  when /(topic|fanout)/
    @exchange ||= FanoutExchange.new(@user, @pass, @host)
  when /queue/
    @exchange ||= DirectExchange.new(@user, @pass, @host)
  else
    @exchange ||= DirectExchange.new(@user, @pass, @host)
  end
end

#receive_once(destination, opts = {}) ⇒ Object



25
26
27
28
29
# File 'lib/rosetta_queue/adapters/amqp.rb', line 25

def receive_once(destination, opts={})
  exchange_strategy_for(destination).do_single_exchange(destination, opts) do |msg|
    return msg
  end
end

#receive_with(message_handler) ⇒ Object



31
32
33
34
# File 'lib/rosetta_queue/adapters/amqp.rb', line 31

def receive_with(message_handler)
  destination = destination_for(message_handler)
  exchange_strategy_for(destination).do_exchange(destination, message_handler)
end

#send_message(destination, message, options = nil) ⇒ Object



36
37
38
# File 'lib/rosetta_queue/adapters/amqp.rb', line 36

def send_message(destination, message, options=nil)
  exchange_strategy_for(destination).publish_to_exchange(destination, message, options)
end

#unsubscribeObject



40
# File 'lib/rosetta_queue/adapters/amqp.rb', line 40

def unsubscribe; end