Class: EventMachine::MQTT::ServerConnection
- Inherits:
-
Connection
- Object
- Connection
- Connection
- EventMachine::MQTT::ServerConnection
- Defined in:
- lib/em-mqtt/server_connection.rb
Constant Summary collapse
- @@clients =
Array.new
Instance Attribute Summary collapse
-
#client_id ⇒ Object
Returns the value of attribute client_id.
-
#keep_alive ⇒ Object
Returns the value of attribute keep_alive.
-
#last_packet ⇒ Object
Returns the value of attribute last_packet.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#message_id ⇒ Object
Returns the value of attribute message_id.
-
#subscriptions ⇒ Object
Returns the value of attribute subscriptions.
-
#timer ⇒ Object
readonly
Returns the value of attribute timer.
Attributes inherited from Connection
#last_received, #last_sent, #state
Instance Method Summary collapse
- #connect(packet) ⇒ Object
- #disconnect ⇒ Object
-
#initialize(logger) ⇒ ServerConnection
constructor
A new instance of ServerConnection.
- #ping(packet) ⇒ Object
- #post_init ⇒ Object
- #process_packet(packet) ⇒ Object
- #publish(packet) ⇒ Object
- #subscribe(packet) ⇒ Object
- #unbind ⇒ Object
Methods inherited from Connection
#connected?, #receive_data, #send_packet
Constructor Details
#initialize(logger) ⇒ ServerConnection
Returns a new instance of ServerConnection.
15 16 17 |
# File 'lib/em-mqtt/server_connection.rb', line 15 def initialize(logger) @logger = logger end |
Instance Attribute Details
#client_id ⇒ Object
Returns the value of attribute client_id.
6 7 8 |
# File 'lib/em-mqtt/server_connection.rb', line 6 def client_id @client_id end |
#keep_alive ⇒ Object
Returns the value of attribute keep_alive.
8 9 10 |
# File 'lib/em-mqtt/server_connection.rb', line 8 def keep_alive @keep_alive end |
#last_packet ⇒ Object
Returns the value of attribute last_packet.
7 8 9 |
# File 'lib/em-mqtt/server_connection.rb', line 7 def last_packet @last_packet end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
13 14 15 |
# File 'lib/em-mqtt/server_connection.rb', line 13 def logger @logger end |
#message_id ⇒ Object
Returns the value of attribute message_id.
9 10 11 |
# File 'lib/em-mqtt/server_connection.rb', line 9 def @message_id end |
#subscriptions ⇒ Object
Returns the value of attribute subscriptions.
10 11 12 |
# File 'lib/em-mqtt/server_connection.rb', line 10 def subscriptions @subscriptions end |
#timer ⇒ Object (readonly)
Returns the value of attribute timer.
12 13 14 |
# File 'lib/em-mqtt/server_connection.rb', line 12 def timer @timer end |
Instance Method Details
#connect(packet) ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/em-mqtt/server_connection.rb', line 59 def connect(packet) # FIXME: check the protocol name and version # FIXME: check the client id is between 1 and 23 charcters self.client_id = packet.client_id ## FIXME: disconnect old client with the same ID send_packet MQTT::Packet::Connack.new @state = :connected @@clients << self logger.info("#{client_id} is now connected") # Setup a keep-alive timer if packet.keep_alive @keep_alive = packet.keep_alive logger.debug("#{client_id}: Setting keep alive timer to #{@keep_alive} seconds") @timer = EventMachine::PeriodicTimer.new(@keep_alive / 2) do last_seen = Time.now - @last_received if last_seen > @keep_alive * 1.5 logger.info("Disconnecting '#{client_id}' because it hasn't been seen for #{last_seen} seconds") disconnect end end end end |
#disconnect ⇒ Object
84 85 86 87 88 |
# File 'lib/em-mqtt/server_connection.rb', line 84 def disconnect logger.debug("Closing connection to #{client_id}") @state = :disconnected close_connection end |
#ping(packet) ⇒ Object
90 91 92 |
# File 'lib/em-mqtt/server_connection.rb', line 90 def ping(packet) send_packet MQTT::Packet::Pingresp.new end |
#post_init ⇒ Object
19 20 21 22 23 24 25 26 27 28 |
# File 'lib/em-mqtt/server_connection.rb', line 19 def post_init super @state = :wait_connect @client_id = nil @keep_alive = 0 @message_id = 0 @subscriptions = [] @timer = nil logger.debug("TCP connection opened") end |
#process_packet(packet) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/em-mqtt/server_connection.rb', line 36 def process_packet(packet) logger.debug("#{client_id}: #{packet.inspect}") if state == :wait_connect and packet.class == MQTT::Packet::Connect connect(packet) elsif state == :connected and packet.class == MQTT::Packet::Pingreq ping(packet) elsif state == :connected and packet.class == MQTT::Packet::Subscribe subscribe(packet) elsif state == :connected and packet.class == MQTT::Packet::Publish publish(packet) elsif packet.class == MQTT::Packet::Disconnect logger.info("#{client_id} has disconnected") disconnect else # FIXME: deal with other packet types raise MQTT::ProtocolException.new( "Wasn't expecting packet of type #{packet.class} when in state #{state}" ) disconnect end end |
#publish(packet) ⇒ Object
103 104 105 106 107 108 109 |
# File 'lib/em-mqtt/server_connection.rb', line 103 def publish(packet) @@clients.each do |client| if client.subscriptions.include?(packet.topic) or client.subscriptions.include?('#') client.send_packet(packet) end end end |
#subscribe(packet) ⇒ Object
94 95 96 97 98 99 100 101 |
# File 'lib/em-mqtt/server_connection.rb', line 94 def subscribe(packet) packet.topics.each do |topic,qos| self.subscriptions << topic end logger.info("#{client_id} has subscriptions: #{self.subscriptions}") # FIXME: send subscribe acknowledgement end |
#unbind ⇒ Object
30 31 32 33 34 |
# File 'lib/em-mqtt/server_connection.rb', line 30 def unbind @@clients.delete(self) @timer.cancel if @timer logger.debug("TCP connection closed") end |