Class: MqttRails::Sender
- Inherits:
-
Object
- Object
- MqttRails::Sender
- Defined in:
- lib/mqtt_rails/sender.rb
Instance Attribute Summary collapse
-
#last_packet_sent_at ⇒ Object
readonly
Returns the value of attribute last_packet_sent_at.
-
#last_pingreq_sent_at ⇒ Object
readonly
Returns the value of attribute last_pingreq_sent_at.
Instance Method Summary collapse
- #append_to_writing(packet) ⇒ Object
- #check_ack_alive(queue, mutex) ⇒ Object
- #flush_waiting_packet(sending = true) ⇒ Object
-
#initialize(ack_timeout) ⇒ Sender
constructor
A new instance of Sender.
- #prepare_sending(queue, mutex, max_packet, packet) ⇒ Object
- #send_packet(packet) ⇒ Object
- #send_pingreq ⇒ Object
- #socket=(socket) ⇒ Object
- #writing_loop ⇒ Object
Constructor Details
#initialize(ack_timeout) ⇒ Sender
Returns a new instance of Sender.
21 22 23 24 25 26 27 28 |
# File 'lib/mqtt_rails/sender.rb', line 21 def initialize(ack_timeout) @socket = nil @writing_queue = [] @publish_queue = [] @publish_mutex = Mutex.new @writing_mutex = Mutex.new @ack_timeout = ack_timeout end |
Instance Attribute Details
#last_packet_sent_at ⇒ Object (readonly)
Returns the value of attribute last_packet_sent_at.
18 19 20 |
# File 'lib/mqtt_rails/sender.rb', line 18 def last_packet_sent_at @last_packet_sent_at end |
#last_pingreq_sent_at ⇒ Object (readonly)
Returns the value of attribute last_pingreq_sent_at.
19 20 21 |
# File 'lib/mqtt_rails/sender.rb', line 19 def last_pingreq_sent_at @last_pingreq_sent_at end |
Instance Method Details
#append_to_writing(packet) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/mqtt_rails/sender.rb', line 66 def append_to_writing(packet) begin if packet.is_a?(MqttRails::Packet::Publish) prepare_sending(@publish_queue, @publish_mutex, MAX_PUBLISH, packet) else prepare_sending(@writing_queue, @writing_mutex, MAX_QUEUE, packet) end rescue FullWritingException sleep SELECT_TIMEOUT retry end MQTT_ERR_SUCCESS end |
#check_ack_alive(queue, mutex) ⇒ Object
115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/mqtt_rails/sender.rb', line 115 def check_ack_alive(queue, mutex) mutex.synchronize do now = Time.now queue.each do |pck| if now >= pck[:timestamp] + @ack_timeout pck[:packet].dup ||= true unless pck[:packet].class == MqttRails::Packet::Subscribe || pck[:packet].class == MqttRails::Packet::Unsubscribe Rails.logger.info("Acknowledgement timeout is over, resending #{pck[:packet].inspect}") send_packet(pck[:packet]) pck[:timestamp] = now end end end end |
#flush_waiting_packet(sending = true) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/mqtt_rails/sender.rb', line 98 def flush_waiting_packet(sending=true) if sending @writing_mutex.synchronize do @writing_queue.each do |packet| send_packet(packet) end end @publish_mutex.synchronize do @publish_queue.each do |packet| send_packet(packet) end end end @writing_queue = [] @publish_queue = [] end |
#prepare_sending(queue, mutex, max_packet, packet) ⇒ Object
55 56 57 58 59 60 61 62 63 64 |
# File 'lib/mqtt_rails/sender.rb', line 55 def prepare_sending(queue, mutex, max_packet, packet) if queue.length < max_packet mutex.synchronize do queue.push(packet) end else Rails.logger.error('Writing queue is full, slowing down') raise FullWritingException end end |
#send_packet(packet) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/mqtt_rails/sender.rb', line 34 def send_packet(packet) begin unless @socket.nil? || @socket.closed? @socket.write(packet.to_s) @last_packet_sent_at = Time.now MQTT_ERR_SUCCESS else MQTT_ERR_FAIL end end rescue StandardError raise WritingException rescue IO::WaitWritable IO.select(nil, [@socket], nil, SELECT_TIMEOUT) retry end |
#send_pingreq ⇒ Object
51 52 53 |
# File 'lib/mqtt_rails/sender.rb', line 51 def send_pingreq @last_pingreq_sent_at = Time.now if send_packet(MqttRails::Packet::Pingreq.new) == MQTT_ERR_SUCCESS end |
#socket=(socket) ⇒ Object
30 31 32 |
# File 'lib/mqtt_rails/sender.rb', line 30 def socket=(socket) @socket = socket end |
#writing_loop ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/mqtt_rails/sender.rb', line 80 def writing_loop @writing_mutex.synchronize do MAX_QUEUE.times do break if @writing_queue.empty? packet = @writing_queue.shift send_packet(packet) end end @publish_mutex.synchronize do MAX_PUBLISH.times do break if @publish_queue.empty? packet = @publish_queue.shift send_packet(packet) end end MQTT_ERR_SUCCESS end |