Class: MqttRails::Handler
- Inherits:
-
Object
- Object
- MqttRails::Handler
- Defined in:
- lib/mqtt_rails/handler.rb
Instance Attribute Summary collapse
-
#clean_session ⇒ Object
Returns the value of attribute clean_session.
-
#last_packet_received_at ⇒ Object
readonly
Returns the value of attribute last_packet_received_at.
-
#last_pingresp_received_at ⇒ Object
readonly
Returns the value of attribute last_pingresp_received_at.
-
#registered_callback ⇒ Object
readonly
Returns the value of attribute registered_callback.
Instance Method Summary collapse
- #check_callback(packet) ⇒ Object
- #clean_session?(session_flag) ⇒ Boolean
- #clear_topic_callback(topic) ⇒ Object
- #config_pubsub(publisher, subscriber) ⇒ Object
- #handle_connack(packet) ⇒ Object
- #handle_connack_accepted(session_flag) ⇒ Object
- #handle_packet(packet) ⇒ Object
- #handle_pingresp(_packet) ⇒ Object
- #handle_puback(packet) ⇒ Object
- #handle_pubcomp(packet) ⇒ Object
- #handle_publish(packet) ⇒ Object
- #handle_pubrec(packet) ⇒ Object
- #handle_pubrel(packet) ⇒ Object
- #handle_suback(packet) ⇒ Object
- #handle_unsuback(packet) ⇒ Object
-
#initialize ⇒ Handler
constructor
A new instance of Handler.
- #new_session?(session_flag) ⇒ Boolean
- #old_session?(session_flag) ⇒ Boolean
- #on_connack(&block) ⇒ Object
- #on_connack=(callback) ⇒ Object
- #on_message(&block) ⇒ Object
- #on_message=(callback) ⇒ Object
- #on_puback(&block) ⇒ Object
- #on_puback=(callback) ⇒ Object
- #on_pubcomp(&block) ⇒ Object
- #on_pubcomp=(callback) ⇒ Object
- #on_pubrec(&block) ⇒ Object
- #on_pubrec=(callback) ⇒ Object
- #on_pubrel(&block) ⇒ Object
- #on_pubrel=(callback) ⇒ Object
- #on_suback(&block) ⇒ Object
- #on_suback=(callback) ⇒ Object
- #on_unsuback(&block) ⇒ Object
- #on_unsuback=(callback) ⇒ Object
- #packet_type(packet) ⇒ Object
- #receive_packet ⇒ Object
- #register_topic_callback(topic, callback, &block) ⇒ Object
- #socket=(socket) ⇒ Object
Constructor Details
#initialize ⇒ Handler
Returns a new instance of Handler.
23 24 25 26 27 |
# File 'lib/mqtt_rails/handler.rb', line 23 def initialize @registered_callback = [] @publisher = nil @subscriber = nil end |
Instance Attribute Details
#clean_session ⇒ Object
Returns the value of attribute clean_session.
21 22 23 |
# File 'lib/mqtt_rails/handler.rb', line 21 def clean_session @clean_session end |
#last_packet_received_at ⇒ Object (readonly)
Returns the value of attribute last_packet_received_at.
19 20 21 |
# File 'lib/mqtt_rails/handler.rb', line 19 def last_packet_received_at @last_packet_received_at end |
#last_pingresp_received_at ⇒ Object (readonly)
Returns the value of attribute last_pingresp_received_at.
20 21 22 |
# File 'lib/mqtt_rails/handler.rb', line 20 def last_pingresp_received_at @last_pingresp_received_at end |
#registered_callback ⇒ Object (readonly)
Returns the value of attribute registered_callback.
18 19 20 |
# File 'lib/mqtt_rails/handler.rb', line 18 def registered_callback @registered_callback end |
Instance Method Details
#check_callback(packet) ⇒ Object
262 263 264 265 266 267 268 269 270 271 272 |
# File 'lib/mqtt_rails/handler.rb', line 262 def check_callback(packet) callbacks = [] @registered_callback.each do |reccord| callbacks.push(reccord.last) if MqttRails.match_filter(packet.topic, reccord.first) end unless callbacks.empty? callbacks.each do |callback| callback.call(packet) end end end |
#clean_session?(session_flag) ⇒ Boolean
108 109 110 111 112 |
# File 'lib/mqtt_rails/handler.rb', line 108 def clean_session?(session_flag) if @clean_session && !session_flag Rails.logger.info("No previous session found by server, starting a new one.") end end |
#clear_topic_callback(topic) ⇒ Object
74 75 76 77 78 79 80 81 |
# File 'lib/mqtt_rails/handler.rb', line 74 def clear_topic_callback(topic) if topic.nil? Rails.logger.error("The topics where the callback is trying to be unregistered have been found nil.") raise ArgumentError end @registered_callback.delete_if { |pair| pair.first == topic } MQTT_ERR_SUCCESS end |
#config_pubsub(publisher, subscriber) ⇒ Object
29 30 31 32 |
# File 'lib/mqtt_rails/handler.rb', line 29 def config_pubsub(publisher, subscriber) @publisher = publisher @subscriber = subscriber end |
#handle_connack(packet) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/mqtt_rails/handler.rb', line 83 def handle_connack(packet) if packet.return_code == 0x00 Rails.logger.info(packet.return_msg) @last_pingresp_received_at = Time.now handle_connack_accepted(packet.session_present) else Rails.logger.warn(packet.return_msg) return MQTT_CS_DISCONNECT end @on_connack.call(packet) unless @on_connack.nil? MQTT_CS_CONNECTED end |
#handle_connack_accepted(session_flag) ⇒ Object
96 97 98 99 100 |
# File 'lib/mqtt_rails/handler.rb', line 96 def handle_connack_accepted(session_flag) clean_session?(session_flag) new_session?(session_flag) old_session?(session_flag) end |
#handle_packet(packet) ⇒ Object
54 55 56 57 58 |
# File 'lib/mqtt_rails/handler.rb', line 54 def handle_packet(packet) Rails.logger.info("New packet #{packet.class} received.") type = packet_type(packet) self.send("handle_#{type}", packet) end |
#handle_pingresp(_packet) ⇒ Object
120 121 122 |
# File 'lib/mqtt_rails/handler.rb', line 120 def handle_pingresp(_packet) @last_pingresp_received_at = Time.now end |
#handle_puback(packet) ⇒ Object
152 153 154 155 156 157 |
# File 'lib/mqtt_rails/handler.rb', line 152 def handle_puback(packet) id = packet.id if @publisher.do_puback(id) == MQTT_ERR_SUCCESS @on_puback.call(packet) unless @on_puback.nil? end end |
#handle_pubcomp(packet) ⇒ Object
173 174 175 176 177 178 |
# File 'lib/mqtt_rails/handler.rb', line 173 def handle_pubcomp(packet) id = packet.id if @publisher.do_pubcomp(id) == MQTT_ERR_SUCCESS @on_pubcomp.call(packet) unless @on_pubcomp.nil? end end |
#handle_publish(packet) ⇒ Object
143 144 145 146 147 148 149 150 |
# File 'lib/mqtt_rails/handler.rb', line 143 def handle_publish(packet) id = packet.id qos = packet.qos if @publisher.do_publish(qos, id) == MQTT_ERR_SUCCESS @on_message.call(packet) unless @on_message.nil? check_callback(packet) end end |
#handle_pubrec(packet) ⇒ Object
159 160 161 162 163 164 |
# File 'lib/mqtt_rails/handler.rb', line 159 def handle_pubrec(packet) id = packet.id if @publisher.do_pubrec(id) == MQTT_ERR_SUCCESS @on_pubrec.call(packet) unless @on_pubrec.nil? end end |
#handle_pubrel(packet) ⇒ Object
166 167 168 169 170 171 |
# File 'lib/mqtt_rails/handler.rb', line 166 def handle_pubrel(packet) id = packet.id if @publisher.do_pubrel(id) == MQTT_ERR_SUCCESS @on_pubrel.call(packet) unless @on_pubrel.nil? end end |
#handle_suback(packet) ⇒ Object
124 125 126 127 128 129 130 131 132 |
# File 'lib/mqtt_rails/handler.rb', line 124 def handle_suback(packet) max_qos = packet.return_codes id = packet.id topics = [] topics = @subscriber.add_subscription(max_qos, id, topics) unless topics.empty? @on_suback.call(topics) unless @on_suback.nil? end end |
#handle_unsuback(packet) ⇒ Object
134 135 136 137 138 139 140 141 |
# File 'lib/mqtt_rails/handler.rb', line 134 def handle_unsuback(packet) id = packet.id topics = [] topics = @subscriber.remove_subscription(id, topics) unless topics.empty? @on_unsuback.call(topics) unless @on_unsuback.nil? end end |
#new_session?(session_flag) ⇒ Boolean
102 103 104 105 106 |
# File 'lib/mqtt_rails/handler.rb', line 102 def new_session?(session_flag) if !@clean_session && !session_flag Rails.logger.info("New session created for the client.") end end |
#old_session?(session_flag) ⇒ Boolean
114 115 116 117 118 |
# File 'lib/mqtt_rails/handler.rb', line 114 def old_session?(session_flag) if !@clean_session && session_flag Rails.logger.info("Previous session restored by the server.") end end |
#on_connack(&block) ⇒ Object
180 181 182 183 |
# File 'lib/mqtt_rails/handler.rb', line 180 def on_connack(&block) @on_connack = block if block_given? @on_connack end |
#on_connack=(callback) ⇒ Object
220 221 222 |
# File 'lib/mqtt_rails/handler.rb', line 220 def on_connack=(callback) @on_connack = callback if callback.is_a?(Proc) end |
#on_message(&block) ⇒ Object
215 216 217 218 |
# File 'lib/mqtt_rails/handler.rb', line 215 def (&block) @on_message = block if block_given? @on_message end |
#on_message=(callback) ⇒ Object
248 249 250 |
# File 'lib/mqtt_rails/handler.rb', line 248 def (callback) @on_message = callback if callback.is_a?(Proc) end |
#on_puback(&block) ⇒ Object
195 196 197 198 |
# File 'lib/mqtt_rails/handler.rb', line 195 def on_puback(&block) @on_puback = block if block_given? @on_puback end |
#on_puback=(callback) ⇒ Object
232 233 234 |
# File 'lib/mqtt_rails/handler.rb', line 232 def on_puback=(callback) @on_puback = callback if callback.is_a?(Proc) end |
#on_pubcomp(&block) ⇒ Object
210 211 212 213 |
# File 'lib/mqtt_rails/handler.rb', line 210 def on_pubcomp(&block) @on_pubcomp = block if block_given? @on_pubcomp end |
#on_pubcomp=(callback) ⇒ Object
244 245 246 |
# File 'lib/mqtt_rails/handler.rb', line 244 def on_pubcomp=(callback) @on_pubcomp = callback if callback.is_a?(Proc) end |
#on_pubrec(&block) ⇒ Object
200 201 202 203 |
# File 'lib/mqtt_rails/handler.rb', line 200 def on_pubrec(&block) @on_pubrec = block if block_given? @on_pubrec end |
#on_pubrec=(callback) ⇒ Object
236 237 238 |
# File 'lib/mqtt_rails/handler.rb', line 236 def on_pubrec=(callback) @on_pubrec = callback if callback.is_a?(Proc) end |
#on_pubrel(&block) ⇒ Object
205 206 207 208 |
# File 'lib/mqtt_rails/handler.rb', line 205 def on_pubrel(&block) @on_pubrel = block if block_given? @on_pubrel end |
#on_pubrel=(callback) ⇒ Object
240 241 242 |
# File 'lib/mqtt_rails/handler.rb', line 240 def on_pubrel=(callback) @on_pubrel = callback if callback.is_a?(Proc) end |
#on_suback(&block) ⇒ Object
185 186 187 188 |
# File 'lib/mqtt_rails/handler.rb', line 185 def on_suback(&block) @on_suback = block if block_given? @on_suback end |
#on_suback=(callback) ⇒ Object
224 225 226 |
# File 'lib/mqtt_rails/handler.rb', line 224 def on_suback=(callback) @on_suback = callback if callback.is_a?(Proc) end |
#on_unsuback(&block) ⇒ Object
190 191 192 193 |
# File 'lib/mqtt_rails/handler.rb', line 190 def on_unsuback(&block) @on_unsuback = block if block_given? @on_unsuback end |
#on_unsuback=(callback) ⇒ Object
228 229 230 |
# File 'lib/mqtt_rails/handler.rb', line 228 def on_unsuback=(callback) @on_unsuback = callback if callback.is_a?(Proc) end |
#packet_type(packet) ⇒ Object
252 253 254 255 256 257 258 259 260 |
# File 'lib/mqtt_rails/handler.rb', line 252 def packet_type(packet) type = packet.class if MqttRails::PACKET_TYPES[3..13].include?(type) type.to_s.split('::').last.downcase else Rails.logger.error("Received an unexpeceted packet: #{packet}.") raise PacketException.new('Invalid packet type id') end end |
#receive_packet ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/mqtt_rails/handler.rb', line 38 def receive_packet result = IO.select([@socket], nil, nil, SELECT_TIMEOUT) unless @socket.nil? || @socket.closed? unless result.nil? packet = MqttRails::Packet::Base.read(@socket) unless packet.nil? @last_packet_received_at = Time.now if packet.is_a?(MqttRails::Packet::Connack) return handle_connack(packet) else handle_packet(packet) end end end result end |
#register_topic_callback(topic, callback, &block) ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/mqtt_rails/handler.rb', line 60 def register_topic_callback(topic, callback, &block) if topic.nil? Rails.logger.error("The topics where the callback is trying to be registered have been found nil.") raise ArgumentError end clear_topic_callback(topic) if block_given? @registered_callback.push([topic, block]) elsif !(callback.nil?) && callback.is_a?(Proc) @registered_callback.push([topic, callback]) end MQTT_ERR_SUCCESS end |
#socket=(socket) ⇒ Object
34 35 36 |
# File 'lib/mqtt_rails/handler.rb', line 34 def socket=(socket) @socket = socket end |