Class: MQTTPipe::Pipe

Inherits:
Object
  • Object
show all
Defined in:
lib/mqtt_pipe/pipe.rb

Overview

The actual wrapper class for MQTT

Defined Under Namespace

Classes: ConnectionError, Context

Instance Method Summary collapse

Constructor Details

#initialize(&block) ⇒ Pipe

Create a new pipe and optionally yield to a block



16
17
18
19
20
21
# File 'lib/mqtt_pipe/pipe.rb', line 16

def initialize &block
  @listeners = []
  @error_handler = nil
  
  instance_eval &block unless block.nil?
end

Instance Method Details

#on(topic, &action) ⇒ Object

Subscribe to a topic and attatch an action that will be called once a message with a matching topic is received.

Raises:

  • (ArgumentError)


28
29
30
31
# File 'lib/mqtt_pipe/pipe.rb', line 28

def on topic, &action
  raise ArgumentError, 'No block given' if action.nil?
  @listeners << Listener.new(topic, &action)
end

#on_anything(&action) ⇒ Object Also known as: on_everything

Subscribe to all topics



36
37
38
# File 'lib/mqtt_pipe/pipe.rb', line 36

def on_anything &action
  on '#', &action
end

#on_error(&action) ⇒ Object



40
41
42
# File 'lib/mqtt_pipe/pipe.rb', line 40

def on_error &action
  @error_handler = action
end

#open(host, port: 1883, &block) ⇒ Object

Open the pipe



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/mqtt_pipe/pipe.rb', line 53

def open host, port: 1883, &block
  listener_thread = nil
  client  = MQTT::Client.connect host: host, port: port
  context = Context.new client
  
  unless @listeners.empty?
    listener_thread = Thread.new(Thread.current) do |parent|          
      client.get do |topic, data|
        begin
          unpacked_data = Packer.unpack data
          
          @listeners.each do |listener|
            if m = listener.match(topic)
              #listener.call unpacked_data, *m
              context.instance_exec unpacked_data, *m, &listener.action
            end
          end
          
        rescue Packer::FormatError
          @error_handler.call topic, data unless @error_handler.nil?
          next
          
        # Raise the exception in the parent thread
        rescue Exception => e
          parent.raise e
        end
      end
    end
    
    client.subscribe *topics
  end
  
  # Call user block
  if block_given?
    begin
      context.instance_eval &block
    rescue ConnectionError
      puts 'TODO: Handle reconnect'
    rescue Interrupt
      exit
    end
  end
  
  # Join with listener thread
  begin
    listener_thread.join unless listener_thread.nil?
  rescue Interrupt
  end
  
ensure
  client.disconnect if client
  listener_thread.exit if listener_thread
end

#topicsObject



46
47
48
# File 'lib/mqtt_pipe/pipe.rb', line 46

def topics
  @listeners.map{|listener| listener.topic }
end