Class: MqttRails::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/mqtt_rails/publisher.rb

Instance Method Summary collapse

Constructor Details

#initialize(sender) ⇒ Publisher

Returns a new instance of Publisher.



18
19
20
21
22
23
24
25
26
27
28
# File 'lib/mqtt_rails/publisher.rb', line 18

def initialize(sender)
  @waiting_puback  = []
  @waiting_pubrec  = []
  @waiting_pubrel  = []
  @waiting_pubcomp = []
  @puback_mutex    = Mutex.new
  @pubrec_mutex    = Mutex.new
  @pubrel_mutex    = Mutex.new
  @pubcomp_mutex   = Mutex.new
  @sender          = sender
end

Instance Method Details

#check_waiting_publisherObject



159
160
161
162
163
164
# File 'lib/mqtt_rails/publisher.rb', line 159

def check_waiting_publisher
  @sender.check_ack_alive(@waiting_puback, @puback_mutex)
  @sender.check_ack_alive(@waiting_pubrec, @pubrec_mutex)
  @sender.check_ack_alive(@waiting_pubrel, @pubrel_mutex)
  @sender.check_ack_alive(@waiting_pubcomp, @pubcomp_mutex)
end

#config_all_message_queueObject



144
145
146
147
148
149
# File 'lib/mqtt_rails/publisher.rb', line 144

def config_all_message_queue
  config_message_queue(@waiting_puback, @puback_mutex)
  config_message_queue(@waiting_pubrec, @pubrec_mutex)
  config_message_queue(@waiting_pubrel, @pubrel_mutex)
  config_message_queue(@waiting_pubcomp, @pubcomp_mutex)
end

#config_message_queue(queue, mutex) ⇒ Object



151
152
153
154
155
156
157
# File 'lib/mqtt_rails/publisher.rb', line 151

def config_message_queue(queue, mutex)
  mutex.synchronize do
    queue.each do |pck|
      pck[:timestamp] = Time.now
    end
  end
end

#do_puback(packet_id) ⇒ Object



90
91
92
93
94
95
# File 'lib/mqtt_rails/publisher.rb', line 90

def do_puback(packet_id)
  @puback_mutex.synchronize do
    @waiting_puback.delete_if { |pck| pck[:id] == packet_id }
  end
  MQTT_ERR_SUCCESS
end

#do_pubcomp(packet_id) ⇒ Object



137
138
139
140
141
142
# File 'lib/mqtt_rails/publisher.rb', line 137

def do_pubcomp(packet_id)
  @pubcomp_mutex.synchronize do
    @waiting_pubcomp.delete_if { |pck| pck[:id] == packet_id }
  end
  MQTT_ERR_SUCCESS
end

#do_publish(qos, packet_id) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/mqtt_rails/publisher.rb', line 68

def do_publish(qos, packet_id)
  case qos
  when 0
  when 1
    send_puback(packet_id)
  when 2
    send_pubrec(packet_id)
  else
    Rails.logger.error("The packet QoS value is invalid in publish.")
    raise PacketException.new('Invalid publish QoS value')
  end
  MQTT_ERR_SUCCESS
end

#do_pubrec(packet_id) ⇒ Object



106
107
108
109
110
111
# File 'lib/mqtt_rails/publisher.rb', line 106

def do_pubrec(packet_id)
  @pubrec_mutex.synchronize do
    @waiting_pubrec.delete_if { |pck| pck[:id] == packet_id }
  end
  send_pubrel(packet_id)
end

#do_pubrel(packet_id) ⇒ Object



122
123
124
125
126
127
# File 'lib/mqtt_rails/publisher.rb', line 122

def do_pubrel(packet_id)
  @pubrel_mutex.synchronize do
    @waiting_pubrel.delete_if { |pck| pck[:id] == packet_id }
  end
  send_pubcomp(packet_id)
end

#flush_publisherObject



166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/mqtt_rails/publisher.rb', line 166

def flush_publisher
  @puback_mutex.synchronize do
    @waiting_puback = []
  end
  @pubrec_mutex.synchronize do
    @waiting_pubrec = []
  end
  @pubrel_mutex.synchronize do
    @waiting_pubrel = []
  end
  @pubcomp_mutex.synchronize do
    @waiting_pubcomp = []
  end
end

#push_queue(waiting_queue, queue_mutex, max_packet, packet, new_id) ⇒ Object



58
59
60
61
62
63
64
65
66
# File 'lib/mqtt_rails/publisher.rb', line 58

def push_queue(waiting_queue, queue_mutex, max_packet, packet, new_id)
  if waiting_queue.length >= max_packet
    raise FullQueueException
  end
  queue_mutex.synchronize do
    waiting_queue.push(:id => new_id, :packet => packet, :timestamp => Time.now)
  end
  MQTT_ERR_SUCCESS
end

#send_puback(packet_id) ⇒ Object



82
83
84
85
86
87
88
# File 'lib/mqtt_rails/publisher.rb', line 82

def send_puback(packet_id)
  packet = MqttRails::Packet::Puback.new(
    :id => packet_id
  )
  @sender.append_to_writing(packet)
  MQTT_ERR_SUCCESS
end

#send_pubcomp(packet_id) ⇒ Object



129
130
131
132
133
134
135
# File 'lib/mqtt_rails/publisher.rb', line 129

def send_pubcomp(packet_id)
  packet = MqttRails::Packet::Pubcomp.new(
    :id => packet_id
  )
  @sender.append_to_writing(packet)
  MQTT_ERR_SUCCESS
end

#send_publish(topic, payload, retain, qos, new_id) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/mqtt_rails/publisher.rb', line 34

def send_publish(topic, payload, retain, qos, new_id)
  packet = MqttRails::Packet::Publish.new(
    :id      => new_id,
    :topic   => topic,
    :payload => payload,
    :retain  => retain,
    :qos     => qos
  )
  begin
    case qos
    when 1
      push_queue(@waiting_puback, @puback_mutex, MAX_QUEUE, packet, new_id)
    when 2
      push_queue(@waiting_pubrec, @pubrec_mutex, MAX_QUEUE, packet, new_id)
    end
  rescue FullQueueException
    Rails.logger.warn("PUBLISH queue is full, waiting for publishing #{packet.inspect}")
    sleep SELECT_TIMEOUT
    retry
  end
  @sender.append_to_writing(packet)
  MQTT_ERR_SUCCESS
end

#send_pubrec(packet_id) ⇒ Object



97
98
99
100
101
102
103
104
# File 'lib/mqtt_rails/publisher.rb', line 97

def send_pubrec(packet_id)
  packet = MqttRails::Packet::Pubrec.new(
    :id => packet_id
  )
  push_queue(@waiting_pubrel, @pubrel_mutex, MAX_QUEUE, packet, packet_id)
  @sender.append_to_writing(packet)
  MQTT_ERR_SUCCESS
end

#send_pubrel(packet_id) ⇒ Object



113
114
115
116
117
118
119
120
# File 'lib/mqtt_rails/publisher.rb', line 113

def send_pubrel(packet_id)
  packet = MqttRails::Packet::Pubrel.new(
    :id => packet_id
  )
  push_queue(@waiting_pubcomp, @pubcomp_mutex, MAX_QUEUE, packet, packet_id)
  @sender.append_to_writing(packet)
  MQTT_ERR_SUCCESS
end

#sender=(sender) ⇒ Object



30
31
32
# File 'lib/mqtt_rails/publisher.rb', line 30

def sender=(sender)
  @sender = sender
end