Class: MQTT::Client
- Inherits:
-
Object
- Object
- MQTT::Client
- Defined in:
- lib/mqtt/client.rb
Overview
Client class for talking to an MQTT server
Constant Summary collapse
- SELECT_TIMEOUT =
Timeout between select polls (in seconds)
0.5
- ATTR_DEFAULTS =
Default attribute values
{ :host => nil, :port => nil, :version => '3.1.1', :keep_alive => 15, :clean_session => true, :client_id => nil, :ack_timeout => 5, :username => nil, :password => nil, :will_topic => nil, :will_payload => nil, :will_qos => 0, :will_retain => false, :ssl => false }
Instance Attribute Summary collapse
-
#ack_timeout ⇒ Object
Number of seconds to wait for acknowledgement packets (default is 5 seconds).
-
#clean_session ⇒ Object
Set the ‘Clean Session’ flag when connecting? (default is true).
-
#client_id ⇒ Object
Client Identifier.
-
#host ⇒ Object
Hostname of the remote server.
-
#keep_alive ⇒ Object
Time (in seconds) between pings to remote server (default is 15 seconds).
-
#last_ping_response ⇒ Object
readonly
Last ping response time.
-
#password ⇒ Object
Password to authenticate to the server with.
-
#port ⇒ Object
Port number of the remote server.
-
#ssl ⇒ Object
Set to true to enable SSL/TLS encrypted communication.
-
#username ⇒ Object
Username to authenticate to the server with.
-
#version ⇒ Object
The version number of the MQTT protocol to use (default 3.1.1).
-
#will_payload ⇒ Object
Contents of message that is sent by server when client disconnect.
-
#will_qos ⇒ Object
The QoS level of the will message sent by the server.
-
#will_retain ⇒ Object
If the Will message should be retain by the server after it is sent.
-
#will_topic ⇒ Object
The topic that the Will message is published to.
Class Method Summary collapse
-
.connect(*args, &block) ⇒ Object
Create and connect a new MQTT Client.
-
.generate_client_id(prefix = 'ruby', length = 16) ⇒ Object
Generate a random client identifier (using the characters 0-9 and a-z).
Instance Method Summary collapse
-
#ca_file=(path) ⇒ Object
Set a path to a file containing a PEM-format CA certificate and enable peer verification.
-
#cert=(cert) ⇒ Object
PEM-format client certificate.
-
#cert_file=(path) ⇒ Object
Set a path to a file containing a PEM-format client certificate.
-
#connect(clientid = nil) ⇒ Object
Connect to the MQTT server If a block is given, then yield to that block and then disconnect again.
-
#connected? ⇒ Boolean
Checks whether the client is connected to the server.
-
#disconnect(send_msg = true) ⇒ Object
Disconnect from the MQTT server.
-
#get(topic = nil, options = {}) ⇒ Object
Return the next message received from the MQTT server.
-
#get_packet(topic = nil) ⇒ Object
Return the next packet object received from the MQTT server.
-
#initialize(*args) ⇒ Client
constructor
Create a new MQTT Client instance.
-
#key=(*args) ⇒ Object
Set to a PEM-format client private key.
-
#key_file=(*args) ⇒ Object
Set a path to a file containing a PEM-format client private key.
-
#publish(topic, payload = '', retain = false, qos = 0) ⇒ Object
Publish a message on a particular topic to the MQTT server.
-
#queue_empty? ⇒ Boolean
Returns true if the incoming message queue is empty.
-
#queue_length ⇒ Object
Returns the length of the incoming message queue.
-
#remote_host ⇒ Object
deprecated
Deprecated.
Please use #host instead
-
#remote_host=(args) ⇒ Object
deprecated
Deprecated.
Please use #host= instead
-
#remote_port ⇒ Object
deprecated
Deprecated.
Please use #port instead
-
#remote_port=(args) ⇒ Object
deprecated
Deprecated.
Please use #port= instead
-
#set_will(topic, payload, retain = false, qos = 0) ⇒ Object
Set the Will for the client.
-
#ssl_context ⇒ Object
Get the OpenSSL context, that is used if SSL/TLS is enabled.
-
#subscribe(*topics) ⇒ Object
Send a subscribe message for one or more topics on the MQTT server.
-
#unsubscribe(*topics) ⇒ Object
Send a unsubscribe message for one or more topics on the MQTT server.
Constructor Details
#initialize(*args) ⇒ Client
Create a new MQTT Client instance
Accepts one of the following:
-
a URI that uses the MQTT scheme
-
a hostname and port
-
a Hash containing attributes to be set on the new instance
If no arguments are given then the method will look for a URI in the MQTT_SERVER environment variable.
Examples:
client = MQTT::Client.new
client = MQTT::Client.new('mqtt://myserver.example.com')
client = MQTT::Client.new('mqtt://user:[email protected]')
client = MQTT::Client.new('myserver.example.com')
client = MQTT::Client.new('myserver.example.com', 18830)
client = MQTT::Client.new(:host => 'myserver.example.com')
client = MQTT::Client.new(:host => 'myserver.example.com', :keep_alive => 30)
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/mqtt/client.rb', line 134 def initialize(*args) if args.last.is_a?(Hash) attr = args.pop else attr = {} end if args.length == 0 if ENV['MQTT_SERVER'] attr.merge!(parse_uri(ENV['MQTT_SERVER'])) end end if args.length >= 1 case args[0] when URI attr.merge!(parse_uri(args[0])) when %r|^mqtts?://| attr.merge!(parse_uri(args[0])) else attr.merge!(:host => args[0]) end end if args.length >= 2 attr.merge!(:port => args[1]) unless args[1].nil? end if args.length >= 3 raise ArgumentError, "Unsupported number of arguments" end # Merge arguments with default values for attributes ATTR_DEFAULTS.merge(attr).each_pair do |k,v| self.send("#{k}=", v) end # Set a default port number if @port.nil? @port = @ssl ? MQTT::DEFAULT_SSL_PORT : MQTT::DEFAULT_PORT end # Initialise private instance variables @last_ping_request = Time.now @last_ping_response = Time.now @socket = nil @read_queue = Queue.new @pubacks = {} @read_thread = nil @write_semaphore = Mutex.new @pubacks_semaphore = Mutex.new end |
Instance Attribute Details
#ack_timeout ⇒ Object
Number of seconds to wait for acknowledgement packets (default is 5 seconds)
36 37 38 |
# File 'lib/mqtt/client.rb', line 36 def ack_timeout @ack_timeout end |
#clean_session ⇒ Object
Set the ‘Clean Session’ flag when connecting? (default is true)
30 31 32 |
# File 'lib/mqtt/client.rb', line 30 def clean_session @clean_session end |
#client_id ⇒ Object
Client Identifier
33 34 35 |
# File 'lib/mqtt/client.rb', line 33 def client_id @client_id end |
#host ⇒ Object
Hostname of the remote server
8 9 10 |
# File 'lib/mqtt/client.rb', line 8 def host @host end |
#keep_alive ⇒ Object
Time (in seconds) between pings to remote server (default is 15 seconds)
27 28 29 |
# File 'lib/mqtt/client.rb', line 27 def keep_alive @keep_alive end |
#last_ping_response ⇒ Object (readonly)
Last ping response time
57 58 59 |
# File 'lib/mqtt/client.rb', line 57 def last_ping_response @last_ping_response end |
#password ⇒ Object
Password to authenticate to the server with
42 43 44 |
# File 'lib/mqtt/client.rb', line 42 def password @password end |
#port ⇒ Object
Port number of the remote server
11 12 13 |
# File 'lib/mqtt/client.rb', line 11 def port @port end |
#ssl ⇒ Object
Set to true to enable SSL/TLS encrypted communication
Set to a symbol to use a specific variant of SSL/TLS. Allowed values include:
24 25 26 |
# File 'lib/mqtt/client.rb', line 24 def ssl @ssl end |
#username ⇒ Object
Username to authenticate to the server with
39 40 41 |
# File 'lib/mqtt/client.rb', line 39 def username @username end |
#version ⇒ Object
The version number of the MQTT protocol to use (default 3.1.1)
14 15 16 |
# File 'lib/mqtt/client.rb', line 14 def version @version end |
#will_payload ⇒ Object
Contents of message that is sent by server when client disconnect
48 49 50 |
# File 'lib/mqtt/client.rb', line 48 def will_payload @will_payload end |
#will_qos ⇒ Object
The QoS level of the will message sent by the server
51 52 53 |
# File 'lib/mqtt/client.rb', line 51 def will_qos @will_qos end |
#will_retain ⇒ Object
If the Will message should be retain by the server after it is sent
54 55 56 |
# File 'lib/mqtt/client.rb', line 54 def will_retain @will_retain end |
#will_topic ⇒ Object
The topic that the Will message is published to
45 46 47 |
# File 'lib/mqtt/client.rb', line 45 def will_topic @will_topic end |
Class Method Details
.connect(*args, &block) ⇒ Object
91 92 93 94 95 |
# File 'lib/mqtt/client.rb', line 91 def self.connect(*args, &block) client = MQTT::Client.new(*args) client.connect(&block) return client end |
.generate_client_id(prefix = 'ruby', length = 16) ⇒ Object
Generate a random client identifier (using the characters 0-9 and a-z)
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/mqtt/client.rb', line 99 def self.generate_client_id(prefix='ruby', length=16) str = prefix.dup length.times do num = rand(36) if (num<10) # Number num += 48 else # Letter num += 87 end str += num.chr end return str end |
Instance Method Details
#ca_file=(path) ⇒ Object
Set a path to a file containing a PEM-format CA certificate and enable peer verification
215 216 217 218 219 220 |
# File 'lib/mqtt/client.rb', line 215 def ca_file=(path) ssl_context.ca_file = path unless path.nil? ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER end end |
#cert=(cert) ⇒ Object
PEM-format client certificate
198 199 200 |
# File 'lib/mqtt/client.rb', line 198 def cert=(cert) ssl_context.cert = OpenSSL::X509::Certificate.new(cert) end |
#cert_file=(path) ⇒ Object
Set a path to a file containing a PEM-format client certificate
193 194 195 |
# File 'lib/mqtt/client.rb', line 193 def cert_file=(path) self.cert = File.read(path) end |
#connect(clientid = nil) ⇒ Object
Connect to the MQTT server If a block is given, then yield to that block and then disconnect again.
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 |
# File 'lib/mqtt/client.rb', line 235 def connect(clientid=nil) unless clientid.nil? @client_id = clientid end if @client_id.nil? or @client_id.empty? if @clean_session if @version == '3.1.0' # Empty client id is not allowed for version 3.1.0 @client_id = MQTT::Client.generate_client_id end else raise 'Must provide a client_id if clean_session is set to false' end end if @host.nil? raise 'No MQTT server host set when attempting to connect' end if not connected? # Create network socket tcp_socket = TCPSocket.new(@host, @port) if @ssl # Set the protocol version if @ssl.is_a?(Symbol) ssl_context.ssl_version = @ssl end @socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context) @socket.sync_close = true # Set hostname on secure socket for Server Name Indication (SNI) if @socket.respond_to?(:hostname=) @socket.hostname = @host end @socket.connect else @socket = tcp_socket end # Construct a connect packet packet = MQTT::Packet::Connect.new( :version => @version, :clean_session => @clean_session, :keep_alive => @keep_alive, :client_id => @client_id, :username => @username, :password => @password, :will_topic => @will_topic, :will_payload => @will_payload, :will_qos => @will_qos, :will_retain => @will_retain ) # Send packet send_packet(packet) # Receive response receive_connack # Start packet reading thread @read_thread = Thread.new(Thread.current) do |parent| Thread.current[:parent] = parent while connected? do receive_packet end end end # If a block is given, then yield and disconnect if block_given? begin yield(self) ensure disconnect end end end |
#connected? ⇒ Boolean
Checks whether the client is connected to the server.
336 337 338 |
# File 'lib/mqtt/client.rb', line 336 def connected? (not @socket.nil?) and (not @socket.closed?) end |
#disconnect(send_msg = true) ⇒ Object
Disconnect from the MQTT server. If you don’t want to say goodbye to the server, set send_msg to false.
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 |
# File 'lib/mqtt/client.rb', line 319 def disconnect(send_msg=true) # Stop reading packets from the socket first @read_thread.kill if @read_thread and @read_thread.alive? @read_thread = nil # Close the socket if it is open if connected? if send_msg packet = MQTT::Packet::Disconnect.new send_packet(packet) end @socket.close unless @socket.nil? @socket = nil end end |
#get(topic = nil, options = {}) ⇒ Object
Return the next message received from the MQTT server. An optional topic can be given to subscribe to.
The method either returns the topic and message as an array:
topic, = client.get
Or can be used with a block to keep processing messages:
client.get('test') do |topic,payload|
# Do stuff here
end
402 403 404 405 406 407 408 409 410 411 412 413 414 |
# File 'lib/mqtt/client.rb', line 402 def get(topic=nil, ={}) if block_given? get_packet(topic) do |packet| yield(packet.topic, packet.payload) unless packet.retain && [:omit_retained] end else loop do # Wait for one packet to be available packet = get_packet(topic) return packet.topic, packet.payload unless packet.retain && [:omit_retained] end end end |
#get_packet(topic = nil) ⇒ Object
Return the next packet object received from the MQTT server. An optional topic can be given to subscribe to.
The method either returns a single packet:
packet = client.get_packet
puts packet.topic
Or can be used with a block to keep processing messages:
client.get_packet('test') do |packet|
# Do stuff here
puts packet.topic
end
429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 |
# File 'lib/mqtt/client.rb', line 429 def get_packet(topic=nil) # Subscribe to a topic, if an argument is given subscribe(topic) unless topic.nil? if block_given? # Loop forever! loop do packet = @read_queue.pop yield(packet) puback_packet(packet) if packet.qos > 0 end else # Wait for one packet to be available packet = @read_queue.pop puback_packet(packet) if packet.qos > 0 return packet end end |
#key=(*args) ⇒ Object
Set to a PEM-format client private key
209 210 211 212 |
# File 'lib/mqtt/client.rb', line 209 def key=(*args) cert, passphrase = args.flatten ssl_context.key = OpenSSL::PKey::RSA.new(cert, passphrase) end |
#key_file=(*args) ⇒ Object
Set a path to a file containing a PEM-format client private key
203 204 205 206 |
# File 'lib/mqtt/client.rb', line 203 def key_file=(*args) path, passphrase = args.flatten ssl_context.key = OpenSSL::PKey::RSA.new(File.open(path), passphrase) end |
#publish(topic, payload = '', retain = false, qos = 0) ⇒ Object
Publish a message on a particular topic to the MQTT server.
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 |
# File 'lib/mqtt/client.rb', line 341 def publish(topic, payload='', retain=false, qos=0) raise ArgumentError.new("Topic name cannot be nil") if topic.nil? raise ArgumentError.new("Topic name cannot be empty") if topic.empty? packet = MQTT::Packet::Publish.new( :id => next_packet_id, :qos => qos, :retain => retain, :topic => topic, :payload => payload ) # Send the packet res = send_packet(packet) if packet.qos > 0 Timeout.timeout(@ack_timeout) do while connected? do @pubacks_semaphore.synchronize do return res if @pubacks.delete(packet.id) end # FIXME: make threads communicate with each other, instead of polling # (using a pipe and select ?) sleep 0.01 end end return -1 end end |
#queue_empty? ⇒ Boolean
Returns true if the incoming message queue is empty.
449 450 451 |
# File 'lib/mqtt/client.rb', line 449 def queue_empty? @read_queue.empty? end |
#queue_length ⇒ Object
Returns the length of the incoming message queue.
454 455 456 |
# File 'lib/mqtt/client.rb', line 454 def queue_length @read_queue.length end |
#remote_host ⇒ Object
Please use #host instead
588 589 590 |
# File 'lib/mqtt/client.rb', line 588 def remote_host host end |
#remote_host=(args) ⇒ Object
Please use #host= instead
593 594 595 |
# File 'lib/mqtt/client.rb', line 593 def remote_host=(args) self.host = args end |
#remote_port ⇒ Object
Please use #port instead
598 599 600 |
# File 'lib/mqtt/client.rb', line 598 def remote_port port end |
#remote_port=(args) ⇒ Object
Please use #port= instead
603 604 605 |
# File 'lib/mqtt/client.rb', line 603 def remote_port=(args) self.port = args end |
#set_will(topic, payload, retain = false, qos = 0) ⇒ Object
Set the Will for the client
The will is a message that will be delivered by the server when the client dies. The Will must be set before establishing a connection to the server
226 227 228 229 230 231 |
# File 'lib/mqtt/client.rb', line 226 def set_will(topic, payload, retain=false, qos=0) self.will_topic = topic self.will_payload = payload self.will_retain = retain self.will_qos = qos end |
#ssl_context ⇒ Object
Get the OpenSSL context, that is used if SSL/TLS is enabled
188 189 190 |
# File 'lib/mqtt/client.rb', line 188 def ssl_context @ssl_context ||= OpenSSL::SSL::SSLContext.new end |
#subscribe(*topics) ⇒ Object
Send a subscribe message for one or more topics on the MQTT server. The topics parameter should be one of the following:
-
String: subscribe to one topic with QoS 0
-
Array: subscribe to multiple topics with QoS 0
-
Hash: subscribe to multiple topics where the key is the topic and the value is the QoS level
For example:
client.subscribe( 'a/b' )
client.subscribe( 'a/b', 'c/d' )
client.subscribe( ['a/b',0], ['c/d',1] )
client.subscribe( 'a/b' => 0, 'c/d' => 1 )
383 384 385 386 387 388 389 |
# File 'lib/mqtt/client.rb', line 383 def subscribe(*topics) packet = MQTT::Packet::Subscribe.new( :id => next_packet_id, :topics => topics ) send_packet(packet) end |
#unsubscribe(*topics) ⇒ Object
Send a unsubscribe message for one or more topics on the MQTT server
459 460 461 462 463 464 465 466 467 468 469 |
# File 'lib/mqtt/client.rb', line 459 def unsubscribe(*topics) if topics.is_a?(Enumerable) and topics.count == 1 topics = topics.first end packet = MQTT::Packet::Unsubscribe.new( :topics => topics, :id => next_packet_id ) send_packet(packet) end |