Class: MQTT::BaseHandler
- Inherits:
-
Object
- Object
- MQTT::BaseHandler
- Includes:
- XasLogger::Mix
- Defined in:
- lib/mqtt/base_handler.rb
Direct Known Subclasses
Custom subscription handling collapse
-
#register_subscription(subObject) ⇒ Object
Register a custom subscription, and send a subscription message to the server.
-
#unregister_subscription(subObject) ⇒ Object
Unregister a subscription.
Class Method Summary collapse
-
.get_topic_split(topicName) ⇒ Array<String>
Split a Topic into a Topic-Array.
-
.getTopicMatch(receivedTopicString, topicPattern) ⇒ nil, Array<String>
Match a topic string to a topic pattern.
Instance Method Summary collapse
- #destroy! ⇒ Object
-
#initialize(mqttClient, logger: nil) ⇒ BaseHandler
constructor
Initialize a new MQTT::SubHandler The handler immediately connects to the server, and begins receciving and sending.
Constructor Details
#initialize(mqttClient, logger: nil) ⇒ BaseHandler
Initialize a new MQTT::SubHandler The handler immediately connects to the server, and begins receciving and sending.
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 |
# File 'lib/mqtt/base_handler.rb', line 287 def initialize(mqttClient, logger: nil) @callbackList = Array.new(); if mqttClient.is_a? String @mqtt = MQTT::Client.new(mqttClient); @mqtt.clean_session = false; else @mqtt = mqttClient; end init_x_log("MQTT #{@mqtt.host}", logger); self.log_level = Logger::INFO; @conChangeMutex = Mutex.new(); @connected = false; @reconnectCount = 0; @mqtt.client_id ||= MQTT::Client.generate_client_id("MQTT_Sub_", 8); @packetQueue = Array.new(); @packetQueueMutex = Mutex.new(); @publisherThreadWaiting = false; @subscribedTopics = Hash.new(); @trackerHash = Hash.new(); @listenerThread = Thread.new do ensure_clean_start(); mqtt_resub_thread(); end @listenerThread.abort_on_exception = true; begin Timeout.timeout(5) { until(@connected) sleep 0.1; end } rescue Timeout::Error x_loge("Broker did not connect!"); end @publisherThread = Thread.new do mqtt_push_thread(); end @publisherThread.abort_on_exception = true; at_exit { destroy!() } end |
Class Method Details
.get_topic_split(topicName) ⇒ Array<String>
This function is mainly used for background processing.
Split a Topic into a Topic-Array
16 17 18 |
# File 'lib/mqtt/base_handler.rb', line 16 def self.get_topic_split(topicName) return topicName.scan(/[^\/]+/); end |
.getTopicMatch(receivedTopicString, topicPattern) ⇒ nil, Array<String>
This function is mainly used for background processing.
Match a topic string to a topic pattern
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/mqtt/base_handler.rb', line 29 def self.getTopicMatch(receivedTopicString, topicPattern) receivedTopicList = get_topic_split receivedTopicString; outputTopicList = Array.new(); return nil unless receivedTopicList.length >= topicPattern.length; topicPattern.each_index do |i| if(topicPattern[i] == "+") outputTopicList << receivedTopicList[i]; elsif(topicPattern[i] == "#") outputTopicList.concat receivedTopicList[i..-1]; return outputTopicList; elsif topicPattern[i] != receivedTopicList[i]; return nil; end end return outputTopicList if topicPattern.length == receivedTopicList.length; return nil; end |
Instance Method Details
#destroy! ⇒ Object
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
# File 'lib/mqtt/base_handler.rb', line 246 def destroy!() return if @destroying @destroying = true; unless @packetQueue.empty? x_logd "Finishing sending of MQTT messages ... " @publisherThread.run() if @publisherThreadWaiting begin Timeout.timeout(4) { until @packetQueue.empty? do sleep 0.05; end } rescue Timeout::Error x_logw "Not all messages were published"; else x_logd "Publish clean finished" end end @publisherThread.run(); @publisherThread.join(); @listenerThread.kill(); @mqtt.disconnect() if @connected ensure_clean_exit(); x_logi("Fully disconnected!"); end |
#register_subscription(subObject) ⇒ Object
Register a custom subscription, and send a subscription message to the server.
113 114 115 116 117 118 119 |
# File 'lib/mqtt/base_handler.rb', line 113 def register_subscription(subObject) raise ArgumentError, "Object is not a subscription!" unless subObject.is_a? MQTT::Subscriptions::Subscription return if @callbackList.include? subObject; @callbackList << subObject; queue_packet({type: :sub, topic: subObject.topic, qos: subObject.qos}); end |
#unregister_subscription(subObject) ⇒ Object
Unregister a subscription. Removes it from the callback list and unsubscribes from the topic if no other subscriptions for it are present.
102 103 104 105 106 107 108 |
# File 'lib/mqtt/base_handler.rb', line 102 def unregister_subscription(subObject) raise ArgumentError, "Object is not a subscription!" unless subObject.is_a? MQTT::Subscriptions::Subscription return unless @callbackList.include? subObject; queue_packet({type: :unsub, topic: subObject.topic}); @callbackList.delete(subObject); end |