30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
# File 'lib/kafkr/producer.rb', line 30
def self.send_message(message)
return if message.nil? || message.empty?
uuid = SecureRandom.uuid
message_with_uuid = nil
if Kafkr::Producer.configuration.is_json
json_message = JSON.parse(message)
json_message["uuid"] = uuid
message_with_uuid = JSON.dump(json_message)
end
encrypted_message_with_uuid = Kafkr::Encryptor.new.encrypt(message_with_uuid)
begin
socket = TCPSocket.new(@configuration.host, @configuration.port)
send_queued_messages(socket)
socket.puts (encrypted_message_with_uuid)
rescue Errno::ECONNREFUSED
Kafkr.log "Connection refused. Queuing message: #{encrypted_message_with_uuid}"
@configuration.message_queue.push(encrypted_message_with_uuid)
save_queue_to_file
rescue Errno::EPIPE
Kafkr.log "Broken pipe error. Retrying connection..."
retry_connection(encrypted_message_with_uuid)
end
uuid
end
|