Class: Kafkr::MessageBroker
- Inherits:
-
Object
- Object
- Kafkr::MessageBroker
- Defined in:
- lib/kafkr/message_broker.rb
Instance Attribute Summary collapse
-
#last_sent ⇒ Object
Returns the value of attribute last_sent.
-
#subscribers ⇒ Object
Returns the value of attribute subscribers.
Instance Method Summary collapse
- #add_subscriber(socket) ⇒ Object
- #broadcast(message) ⇒ Object
-
#initialize ⇒ MessageBroker
constructor
A new instance of MessageBroker.
Constructor Details
#initialize ⇒ MessageBroker
Returns a new instance of MessageBroker.
5 6 7 8 |
# File 'lib/kafkr/message_broker.rb', line 5 def initialize @subscribers = [] @last_sent = {} end |
Instance Attribute Details
#last_sent ⇒ Object
Returns the value of attribute last_sent.
3 4 5 |
# File 'lib/kafkr/message_broker.rb', line 3 def last_sent @last_sent end |
#subscribers ⇒ Object
Returns the value of attribute subscribers.
3 4 5 |
# File 'lib/kafkr/message_broker.rb', line 3 def subscribers @subscribers end |
Instance Method Details
#add_subscriber(socket) ⇒ Object
10 11 12 13 |
# File 'lib/kafkr/message_broker.rb', line 10 def add_subscriber(socket) @subscribers << socket @last_sent[socket] = nil end |
#broadcast(message) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/kafkr/message_broker.rb', line 15 def broadcast() = Kafkr::Encryptor.new.encrypt() @subscribers.each do |subscriber| if !subscriber.closed? subscriber.puts() @last_sent[subscriber] = end rescue Errno::EPIPE # Optionally, handle broken pipe error rescue IOError begin @subscribers.delete(subscriber) @last_sent.delete(subscriber) rescue Kafkr.log "clean up subscribers" end end end |