Class: MQTTPipe::Pipe
- Inherits:
-
Object
- Object
- MQTTPipe::Pipe
- Defined in:
- lib/mqtt_pipe/pipe.rb
Overview
The actual wrapper class for MQTT
Defined Under Namespace
Classes: ConnectionError, Context
Instance Method Summary collapse
-
#initialize(&block) ⇒ Pipe
constructor
Create a new pipe and optionally yield to a block.
-
#on(topic, &action) ⇒ Object
Subscribe to a topic and attatch an action that will be called once a message with a matching topic is received.
-
#on_anything(&action) ⇒ Object
(also: #on_everything)
Subscribe to all topics.
- #on_error(&action) ⇒ Object
-
#open(host, port: 1883, &block) ⇒ Object
Open the pipe.
- #topics ⇒ Object
Constructor Details
#initialize(&block) ⇒ Pipe
Create a new pipe and optionally yield to a block
16 17 18 19 20 21 |
# File 'lib/mqtt_pipe/pipe.rb', line 16 def initialize &block @listeners = [] @error_handler = nil instance_eval &block unless block.nil? end |
Instance Method Details
#on(topic, &action) ⇒ Object
Subscribe to a topic and attatch an action that will be called once a message with a matching topic is received.
28 29 30 31 |
# File 'lib/mqtt_pipe/pipe.rb', line 28 def on topic, &action raise ArgumentError, 'No block given' if action.nil? @listeners << Listener.new(topic, &action) end |
#on_anything(&action) ⇒ Object Also known as: on_everything
Subscribe to all topics
36 37 38 |
# File 'lib/mqtt_pipe/pipe.rb', line 36 def on_anything &action on '#', &action end |
#on_error(&action) ⇒ Object
40 41 42 |
# File 'lib/mqtt_pipe/pipe.rb', line 40 def on_error &action @error_handler = action end |
#open(host, port: 1883, &block) ⇒ Object
Open the pipe
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/mqtt_pipe/pipe.rb', line 53 def open host, port: 1883, &block listener_thread = nil client = MQTT::Client.connect host: host, port: port context = Context.new client unless @listeners.empty? listener_thread = Thread.new(Thread.current) do |parent| client.get do |topic, data| begin unpacked_data = Packer.unpack data @listeners.each do |listener| if m = listener.match(topic) #listener.call unpacked_data, *m context.instance_exec unpacked_data, *m, &listener.action end end rescue Packer::FormatError @error_handler.call topic, data unless @error_handler.nil? next # Raise the exception in the parent thread rescue Exception => e parent.raise e end end end client.subscribe *topics end # Call user block if block_given? begin context.instance_eval &block rescue ConnectionError puts 'TODO: Handle reconnect' rescue Interrupt exit end end # Join with listener thread begin listener_thread.join unless listener_thread.nil? rescue Interrupt end ensure client.disconnect if client listener_thread.exit if listener_thread end |
#topics ⇒ Object
46 47 48 |
# File 'lib/mqtt_pipe/pipe.rb', line 46 def topics @listeners.map{|listener| listener.topic } end |