Class: Kafkr::MessageBroker

Inherits:
Object
  • Object
show all
Defined in:
lib/kafkr/message_broker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeMessageBroker

Returns a new instance of MessageBroker.



5
6
7
8
# File 'lib/kafkr/message_broker.rb', line 5

def initialize
  @subscribers = []
  @last_sent = {}
end

Instance Attribute Details

#last_sentObject

Returns the value of attribute last_sent.



3
4
5
# File 'lib/kafkr/message_broker.rb', line 3

def last_sent
  @last_sent
end

#subscribersObject

Returns the value of attribute subscribers.



3
4
5
# File 'lib/kafkr/message_broker.rb', line 3

def subscribers
  @subscribers
end

Instance Method Details

#add_subscriber(socket) ⇒ Object



10
11
12
13
# File 'lib/kafkr/message_broker.rb', line 10

def add_subscriber(socket)
  @subscribers << socket
  @last_sent[socket] = nil
end

#broadcast(message) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/kafkr/message_broker.rb', line 15

def broadcast(message)
  encrypted_message = Kafkr::Encryptor.new.encrypt(message) 
  @subscribers.each do |subscriber|
    if !subscriber.closed?
      subscriber.puts(encrypted_message)
      @last_sent[subscriber] = encrypted_message
    end
  rescue Errno::EPIPE
    # Optionally, handle broken pipe error
  rescue IOError
    begin
       @subscribers.delete(subscriber)
      @last_sent.delete(subscriber)
    rescue
      Kafkr.log "clean up subscribers"
    end

  end
end