Class: MqttRails::Sender

Inherits:
Object
  • Object
show all
Defined in:
lib/mqtt_rails/sender.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(ack_timeout) ⇒ Sender

Returns a new instance of Sender.



21
22
23
24
25
26
27
28
# File 'lib/mqtt_rails/sender.rb', line 21

def initialize(ack_timeout)
  @socket          = nil
  @writing_queue   = []
  @publish_queue   = []
  @publish_mutex   = Mutex.new
  @writing_mutex   = Mutex.new
  @ack_timeout     = ack_timeout
end

Instance Attribute Details

#last_packet_sent_atObject (readonly)

Returns the value of attribute last_packet_sent_at.



18
19
20
# File 'lib/mqtt_rails/sender.rb', line 18

def last_packet_sent_at
  @last_packet_sent_at
end

#last_pingreq_sent_atObject (readonly)

Returns the value of attribute last_pingreq_sent_at.



19
20
21
# File 'lib/mqtt_rails/sender.rb', line 19

def last_pingreq_sent_at
  @last_pingreq_sent_at
end

Instance Method Details

#append_to_writing(packet) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/mqtt_rails/sender.rb', line 66

def append_to_writing(packet)
  begin
    if packet.is_a?(MqttRails::Packet::Publish)
      prepare_sending(@publish_queue, @publish_mutex, MAX_PUBLISH, packet)
    else
      prepare_sending(@writing_queue, @writing_mutex, MAX_QUEUE, packet)
    end
  rescue FullWritingException
    sleep SELECT_TIMEOUT
    retry
  end
  MQTT_ERR_SUCCESS
end

#check_ack_alive(queue, mutex) ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/mqtt_rails/sender.rb', line 115

def check_ack_alive(queue, mutex)
  mutex.synchronize do
    now = Time.now
    queue.each do |pck|
      if now >= pck[:timestamp] + @ack_timeout
        pck[:packet].dup ||= true unless pck[:packet].class == MqttRails::Packet::Subscribe || pck[:packet].class == MqttRails::Packet::Unsubscribe
        Rails.logger.info("Acknowledgement timeout is over, resending #{pck[:packet].inspect}")
        send_packet(pck[:packet])
        pck[:timestamp] = now
      end
    end
  end
end

#flush_waiting_packet(sending = true) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/mqtt_rails/sender.rb', line 98

def flush_waiting_packet(sending=true)
  if sending
    @writing_mutex.synchronize do
      @writing_queue.each do |packet|
        send_packet(packet)
      end
    end
    @publish_mutex.synchronize do
      @publish_queue.each do |packet|
        send_packet(packet)
      end
    end
  end
  @writing_queue = []
  @publish_queue = []
end

#prepare_sending(queue, mutex, max_packet, packet) ⇒ Object



55
56
57
58
59
60
61
62
63
64
# File 'lib/mqtt_rails/sender.rb', line 55

def prepare_sending(queue, mutex, max_packet, packet)
  if queue.length < max_packet
    mutex.synchronize do
      queue.push(packet)
    end
  else
    Rails.logger.error('Writing queue is full, slowing down')
    raise FullWritingException
  end
end

#send_packet(packet) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/mqtt_rails/sender.rb', line 34

def send_packet(packet)
  begin
    unless @socket.nil? || @socket.closed?
      @socket.write(packet.to_s)
      @last_packet_sent_at = Time.now
      MQTT_ERR_SUCCESS
    else
      MQTT_ERR_FAIL
    end
  end
rescue StandardError
  raise WritingException
rescue IO::WaitWritable
  IO.select(nil, [@socket], nil, SELECT_TIMEOUT)
  retry
end

#send_pingreqObject



51
52
53
# File 'lib/mqtt_rails/sender.rb', line 51

def send_pingreq
  @last_pingreq_sent_at = Time.now if send_packet(MqttRails::Packet::Pingreq.new) == MQTT_ERR_SUCCESS
end

#socket=(socket) ⇒ Object



30
31
32
# File 'lib/mqtt_rails/sender.rb', line 30

def socket=(socket)
  @socket = socket
end

#writing_loopObject



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/mqtt_rails/sender.rb', line 80

def writing_loop
  @writing_mutex.synchronize do
    MAX_QUEUE.times do
      break if @writing_queue.empty?
      packet = @writing_queue.shift
      send_packet(packet)
    end
  end
  @publish_mutex.synchronize do
    MAX_PUBLISH.times do
      break if @publish_queue.empty?
      packet = @publish_queue.shift
      send_packet(packet)
    end
  end
  MQTT_ERR_SUCCESS
end