Class: Klomp::Connection
- Inherits:
-
Object
- Object
- Klomp::Connection
- Defined in:
- lib/klomp/connection.rb
Instance Attribute Summary collapse
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#subscriptions ⇒ Object
readonly
Returns the value of attribute subscriptions.
Instance Method Summary collapse
- #closed? ⇒ Boolean
- #connected? ⇒ Boolean
- #disconnect ⇒ Object
-
#initialize(server, options = {}) ⇒ Connection
constructor
A new instance of Connection.
- #publish(queue, body, headers = {}) ⇒ Object
- #reconnect ⇒ Object
- #subscribe(queue, subscriber = nil, &block) ⇒ Object
- #unsubscribe(queue) ⇒ Object
Constructor Details
#initialize(server, options = {}) ⇒ Connection
Returns a new instance of Connection.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/klomp/connection.rb', line 10 def initialize(server, ={}) @options = if server =~ /^stomp:\/\// uri = URI.parse server host, port = uri.host, uri.port @options['login'] = uri.user if uri.user @options['passcode'] = uri.password if uri.password if uri.query && !uri.query.empty? uri.query.split('&').each {|pair| k, v = pair.split('=', 2); @options[k] = v } end else address = server.split ':' port, host = address.pop.to_i, address.pop @options['host'] ||= address.pop unless address.empty? end @options['server'] = [host, port] @options['host'] ||= host @subscriptions = {} @logger = ['logger'] @select_timeout = ['select_timeout'] || 0.5 connect end |
Instance Attribute Details
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
8 9 10 |
# File 'lib/klomp/connection.rb', line 8 def logger @logger end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
8 9 10 |
# File 'lib/klomp/connection.rb', line 8 def @options end |
#subscriptions ⇒ Object (readonly)
Returns the value of attribute subscriptions.
8 9 10 |
# File 'lib/klomp/connection.rb', line 8 def subscriptions @subscriptions end |
Instance Method Details
#closed? ⇒ Boolean
60 |
# File 'lib/klomp/connection.rb', line 60 def closed?() @closing && @socket.nil? end |
#connected? ⇒ Boolean
59 |
# File 'lib/klomp/connection.rb', line 59 def connected?() @socket end |
#disconnect ⇒ Object
62 63 64 65 66 67 68 69 70 |
# File 'lib/klomp/connection.rb', line 62 def disconnect close! stop_subscriber_thread frame = Frames::Disconnect.new write frame rescue nil @socket.close rescue nil @socket = nil frame end |
#publish(queue, body, headers = {}) ⇒ Object
35 36 37 |
# File 'lib/klomp/connection.rb', line 35 def publish(queue, body, headers={}) write Frames::Send.new(queue, body, headers) end |
#reconnect ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/klomp/connection.rb', line 72 def reconnect return if connected? logger.warn "reconnect server=#{['server'].join(':')}" if logger connect subs = subscriptions.dup subscriptions.clear subs.each {|queue, subscriber| subscribe(queue, subscriber) } @sentinel = nil ensure @subscriptions = subs if subs && subs.size != @subscriptions.size end |
#subscribe(queue, subscriber = nil, &block) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/klomp/connection.rb', line 39 def subscribe(queue, subscriber = nil, &block) raise Klomp::Error, "no subscriber provided" unless subscriber || block raise Klomp::Error, "subscriber does not respond to #call" if subscriber && !subscriber.respond_to?(:call) previous = subscriptions[queue] subscriptions[queue] = subscriber || block frame = Frames::Subscribe.new(queue) if previous frame.previous_subscriber = previous else write frame end start_subscriber_thread frame end |
#unsubscribe(queue) ⇒ Object
54 55 56 57 |
# File 'lib/klomp/connection.rb', line 54 def unsubscribe(queue) queue = queue.headers['destination'] if Frames::Subscribe === queue write Frames::Unsubscribe.new(queue) if subscriptions.delete queue end |