Class: MqttRails::Handler

Inherits:
Object
  • Object
show all
Defined in:
lib/mqtt_rails/handler.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeHandler

Returns a new instance of Handler.



23
24
25
26
27
# File 'lib/mqtt_rails/handler.rb', line 23

def initialize
  @registered_callback = []
  @publisher           = nil
  @subscriber          = nil
end

Instance Attribute Details

#clean_sessionObject

Returns the value of attribute clean_session.



21
22
23
# File 'lib/mqtt_rails/handler.rb', line 21

def clean_session
  @clean_session
end

#last_packet_received_atObject (readonly)

Returns the value of attribute last_packet_received_at.



19
20
21
# File 'lib/mqtt_rails/handler.rb', line 19

def last_packet_received_at
  @last_packet_received_at
end

#last_pingresp_received_atObject (readonly)

Returns the value of attribute last_pingresp_received_at.



20
21
22
# File 'lib/mqtt_rails/handler.rb', line 20

def last_pingresp_received_at
  @last_pingresp_received_at
end

#registered_callbackObject (readonly)

Returns the value of attribute registered_callback.



18
19
20
# File 'lib/mqtt_rails/handler.rb', line 18

def registered_callback
  @registered_callback
end

Instance Method Details

#check_callback(packet) ⇒ Object



262
263
264
265
266
267
268
269
270
271
272
# File 'lib/mqtt_rails/handler.rb', line 262

def check_callback(packet)
  callbacks = []
  @registered_callback.each do |reccord|
    callbacks.push(reccord.last) if MqttRails.match_filter(packet.topic, reccord.first)
  end
  unless callbacks.empty?
    callbacks.each do |callback|
      callback.call(packet)
    end
  end
end

#clean_session?(session_flag) ⇒ Boolean

Returns:

  • (Boolean)


108
109
110
111
112
# File 'lib/mqtt_rails/handler.rb', line 108

def clean_session?(session_flag)
  if @clean_session && !session_flag
    Rails.logger.info("No previous session found by server, starting a new one.")
  end
end

#clear_topic_callback(topic) ⇒ Object



74
75
76
77
78
79
80
81
# File 'lib/mqtt_rails/handler.rb', line 74

def clear_topic_callback(topic)
  if topic.nil?
    Rails.logger.error("The topics where the callback is trying to be unregistered have been found nil.")
    raise ArgumentError
  end
  @registered_callback.delete_if { |pair| pair.first == topic }
  MQTT_ERR_SUCCESS
end

#config_pubsub(publisher, subscriber) ⇒ Object



29
30
31
32
# File 'lib/mqtt_rails/handler.rb', line 29

def config_pubsub(publisher, subscriber)
  @publisher = publisher
  @subscriber = subscriber
end

#handle_connack(packet) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/mqtt_rails/handler.rb', line 83

def handle_connack(packet)
  if packet.return_code == 0x00
    Rails.logger.info(packet.return_msg)
    @last_pingresp_received_at = Time.now
    handle_connack_accepted(packet.session_present)
  else
    Rails.logger.warn(packet.return_msg)
    return MQTT_CS_DISCONNECT
  end
  @on_connack.call(packet) unless @on_connack.nil?
  MQTT_CS_CONNECTED
end

#handle_connack_accepted(session_flag) ⇒ Object



96
97
98
99
100
# File 'lib/mqtt_rails/handler.rb', line 96

def handle_connack_accepted(session_flag)
  clean_session?(session_flag)
  new_session?(session_flag)
  old_session?(session_flag)
end

#handle_packet(packet) ⇒ Object



54
55
56
57
58
# File 'lib/mqtt_rails/handler.rb', line 54

def handle_packet(packet)
  Rails.logger.info("New packet #{packet.class} received.")
  type = packet_type(packet)
  self.send("handle_#{type}", packet)
end

#handle_pingresp(_packet) ⇒ Object



120
121
122
# File 'lib/mqtt_rails/handler.rb', line 120

def handle_pingresp(_packet)
  @last_pingresp_received_at = Time.now
end

#handle_puback(packet) ⇒ Object



152
153
154
155
156
157
# File 'lib/mqtt_rails/handler.rb', line 152

def handle_puback(packet)
  id = packet.id
  if @publisher.do_puback(id) == MQTT_ERR_SUCCESS
    @on_puback.call(packet) unless @on_puback.nil?
  end
end

#handle_pubcomp(packet) ⇒ Object



173
174
175
176
177
178
# File 'lib/mqtt_rails/handler.rb', line 173

def handle_pubcomp(packet)
  id = packet.id
  if @publisher.do_pubcomp(id) == MQTT_ERR_SUCCESS
    @on_pubcomp.call(packet) unless @on_pubcomp.nil?
  end
end

#handle_publish(packet) ⇒ Object



143
144
145
146
147
148
149
150
# File 'lib/mqtt_rails/handler.rb', line 143

def handle_publish(packet)
  id = packet.id
  qos = packet.qos
  if @publisher.do_publish(qos, id) == MQTT_ERR_SUCCESS
    @on_message.call(packet) unless @on_message.nil?
    check_callback(packet)
  end
end

#handle_pubrec(packet) ⇒ Object



159
160
161
162
163
164
# File 'lib/mqtt_rails/handler.rb', line 159

def handle_pubrec(packet)
  id = packet.id
  if @publisher.do_pubrec(id) == MQTT_ERR_SUCCESS
    @on_pubrec.call(packet) unless @on_pubrec.nil?
  end
end

#handle_pubrel(packet) ⇒ Object



166
167
168
169
170
171
# File 'lib/mqtt_rails/handler.rb', line 166

def handle_pubrel(packet)
  id = packet.id
  if @publisher.do_pubrel(id) == MQTT_ERR_SUCCESS
    @on_pubrel.call(packet) unless @on_pubrel.nil?
  end
end

#handle_suback(packet) ⇒ Object



124
125
126
127
128
129
130
131
132
# File 'lib/mqtt_rails/handler.rb', line 124

def handle_suback(packet)
  max_qos = packet.return_codes
  id      = packet.id
  topics  = []
  topics  = @subscriber.add_subscription(max_qos, id, topics)
  unless topics.empty?
    @on_suback.call(topics) unless @on_suback.nil?
  end
end

#handle_unsuback(packet) ⇒ Object



134
135
136
137
138
139
140
141
# File 'lib/mqtt_rails/handler.rb', line 134

def handle_unsuback(packet)
  id = packet.id
  topics = []
  topics = @subscriber.remove_subscription(id, topics)
  unless topics.empty?
    @on_unsuback.call(topics) unless @on_unsuback.nil?
  end
end

#new_session?(session_flag) ⇒ Boolean

Returns:

  • (Boolean)


102
103
104
105
106
# File 'lib/mqtt_rails/handler.rb', line 102

def new_session?(session_flag)
  if !@clean_session && !session_flag
    Rails.logger.info("New session created for the client.")
  end
end

#old_session?(session_flag) ⇒ Boolean

Returns:

  • (Boolean)


114
115
116
117
118
# File 'lib/mqtt_rails/handler.rb', line 114

def old_session?(session_flag)
  if !@clean_session && session_flag
    Rails.logger.info("Previous session restored by the server.")
  end
end

#on_connack(&block) ⇒ Object



180
181
182
183
# File 'lib/mqtt_rails/handler.rb', line 180

def on_connack(&block)
  @on_connack = block if block_given?
  @on_connack
end

#on_connack=(callback) ⇒ Object



220
221
222
# File 'lib/mqtt_rails/handler.rb', line 220

def on_connack=(callback)
  @on_connack = callback if callback.is_a?(Proc)
end

#on_message(&block) ⇒ Object



215
216
217
218
# File 'lib/mqtt_rails/handler.rb', line 215

def on_message(&block)
  @on_message = block if block_given?
  @on_message
end

#on_message=(callback) ⇒ Object



248
249
250
# File 'lib/mqtt_rails/handler.rb', line 248

def on_message=(callback)
  @on_message = callback if callback.is_a?(Proc)
end

#on_puback(&block) ⇒ Object



195
196
197
198
# File 'lib/mqtt_rails/handler.rb', line 195

def on_puback(&block)
  @on_puback = block if block_given?
  @on_puback
end

#on_puback=(callback) ⇒ Object



232
233
234
# File 'lib/mqtt_rails/handler.rb', line 232

def on_puback=(callback)
  @on_puback = callback if callback.is_a?(Proc)
end

#on_pubcomp(&block) ⇒ Object



210
211
212
213
# File 'lib/mqtt_rails/handler.rb', line 210

def on_pubcomp(&block)
  @on_pubcomp = block if block_given?
  @on_pubcomp
end

#on_pubcomp=(callback) ⇒ Object



244
245
246
# File 'lib/mqtt_rails/handler.rb', line 244

def on_pubcomp=(callback)
  @on_pubcomp = callback if callback.is_a?(Proc)
end

#on_pubrec(&block) ⇒ Object



200
201
202
203
# File 'lib/mqtt_rails/handler.rb', line 200

def on_pubrec(&block)
  @on_pubrec = block if block_given?
  @on_pubrec
end

#on_pubrec=(callback) ⇒ Object



236
237
238
# File 'lib/mqtt_rails/handler.rb', line 236

def on_pubrec=(callback)
  @on_pubrec = callback if callback.is_a?(Proc)
end

#on_pubrel(&block) ⇒ Object



205
206
207
208
# File 'lib/mqtt_rails/handler.rb', line 205

def on_pubrel(&block)
  @on_pubrel = block if block_given?
  @on_pubrel
end

#on_pubrel=(callback) ⇒ Object



240
241
242
# File 'lib/mqtt_rails/handler.rb', line 240

def on_pubrel=(callback)
  @on_pubrel = callback if callback.is_a?(Proc)
end

#on_suback(&block) ⇒ Object



185
186
187
188
# File 'lib/mqtt_rails/handler.rb', line 185

def on_suback(&block)
  @on_suback = block if block_given?
  @on_suback
end

#on_suback=(callback) ⇒ Object



224
225
226
# File 'lib/mqtt_rails/handler.rb', line 224

def on_suback=(callback)
  @on_suback = callback if callback.is_a?(Proc)
end

#on_unsuback(&block) ⇒ Object



190
191
192
193
# File 'lib/mqtt_rails/handler.rb', line 190

def on_unsuback(&block)
  @on_unsuback = block if block_given?
  @on_unsuback
end

#on_unsuback=(callback) ⇒ Object



228
229
230
# File 'lib/mqtt_rails/handler.rb', line 228

def on_unsuback=(callback)
  @on_unsuback = callback if callback.is_a?(Proc)
end

#packet_type(packet) ⇒ Object



252
253
254
255
256
257
258
259
260
# File 'lib/mqtt_rails/handler.rb', line 252

def packet_type(packet)
  type = packet.class
  if MqttRails::PACKET_TYPES[3..13].include?(type)
    type.to_s.split('::').last.downcase
  else
    Rails.logger.error("Received an unexpeceted packet: #{packet}.")
    raise PacketException.new('Invalid packet type id')
  end
end

#receive_packetObject



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/mqtt_rails/handler.rb', line 38

def receive_packet
  result = IO.select([@socket], nil, nil, SELECT_TIMEOUT) unless @socket.nil? || @socket.closed?
  unless result.nil?
    packet = MqttRails::Packet::Base.read(@socket)
    unless packet.nil?
      @last_packet_received_at = Time.now
      if packet.is_a?(MqttRails::Packet::Connack)
        return handle_connack(packet)
      else
        handle_packet(packet)
      end
    end
  end
  result
end

#register_topic_callback(topic, callback, &block) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/mqtt_rails/handler.rb', line 60

def register_topic_callback(topic, callback, &block)
  if topic.nil?
    Rails.logger.error("The topics where the callback is trying to be registered have been found nil.")
    raise ArgumentError
  end
  clear_topic_callback(topic)
  if block_given?
    @registered_callback.push([topic, block])
  elsif !(callback.nil?) && callback.is_a?(Proc)
    @registered_callback.push([topic, callback])
  end
  MQTT_ERR_SUCCESS
end

#socket=(socket) ⇒ Object



34
35
36
# File 'lib/mqtt_rails/handler.rb', line 34

def socket=(socket)
  @socket = socket
end