Class: RosettaQueue::Gateway::StompAdapter

Inherits:
BaseAdapter
  • Object
show all
Defined in:
lib/rosetta_queue/adapters/stomp.rb

Instance Method Summary collapse

Constructor Details

#initialize(user, password, host, port) ⇒ StompAdapter

Returns a new instance of StompAdapter.


12
13
14
# File 'lib/rosetta_queue/adapters/stomp.rb', line 12

def initialize(user, password, host, port)
  @conn = Stomp::Connection.open(user, password, host, port, true)
end

Instance Method Details

#ack(msg) ⇒ Object


8
9
10
# File 'lib/rosetta_queue/adapters/stomp.rb', line 8

def ack(msg)
  @conn.ack(msg.headers["message-id"])
end

#disconnect(message_handler) ⇒ Object


16
17
18
19
# File 'lib/rosetta_queue/adapters/stomp.rb', line 16

def disconnect(message_handler)
  unsubscribe(destination_for(message_handler))
  @conn.disconnect
end

#receive(options) ⇒ Object


21
22
23
24
25
# File 'lib/rosetta_queue/adapters/stomp.rb', line 21

def receive(options)
  msg = @conn.receive
  ack(msg) unless options[:ack].nil?
  msg
end

#receive_once(destination, opts) ⇒ Object


27
28
29
30
31
32
33
# File 'lib/rosetta_queue/adapters/stomp.rb', line 27

def receive_once(destination, opts)
  subscribe(destination, opts)
  msg = receive(opts).body
  unsubscribe(destination)
  RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
  filter_receiving(msg)
end

#receive_with(message_handler) ⇒ Object


35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/rosetta_queue/adapters/stomp.rb', line 35

def receive_with(message_handler)
  options = options_for(message_handler)
  destination = destination_for(message_handler)
  @conn.subscribe(destination, options)

  running do
    msg = receive(options).body
    Thread.current[:processing] = true
    RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
    message_handler.on_message(filter_receiving(msg))
    Thread.current[:processing] = false
  end
end

#send_message(destination, message, options) ⇒ Object


49
50
51
52
# File 'lib/rosetta_queue/adapters/stomp.rb', line 49

def send_message(destination, message, options)
  RosettaQueue.logger.info("Publishing to #{destination} :: #{message}")        
  @conn.send(destination, message, options)
end

#subscribe(destination, options) ⇒ Object


54
55
56
# File 'lib/rosetta_queue/adapters/stomp.rb', line 54

def subscribe(destination, options)
  @conn.subscribe(destination, options)
end

#unsubscribe(destination) ⇒ Object


58
59
60
# File 'lib/rosetta_queue/adapters/stomp.rb', line 58

def unsubscribe(destination)
  @conn.unsubscribe(destination)
end