Module: Kafkr::Producer

Defined in:
lib/kafkr/producer.rb

Constant Summary collapse

MESSAGE_QUEUE =
"./.kafkr/message_queue.txt"
@@file_mutex =
Mutex.new

Class Method Summary collapse

Class Method Details

.configurationObject



14
15
16
17
18
19
20
21
22
# File 'lib/kafkr/producer.rb', line 14

def self.configuration
  FileUtils.mkdir_p "./.kafkr"
  @configuration ||= OpenStruct.new
  @configuration.queue_file = MESSAGE_QUEUE
  @configuration.message_queue = []
  load_queue_from_file
  @configuration.is_json = true
  @configuration
end

.configureObject



24
25
26
27
28
# File 'lib/kafkr/producer.rb', line 24

def self.configure
  yield(configuration)
rescue => e
  logger.error("Configuration error: #{e.message}")
end

.send_message(message) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/kafkr/producer.rb', line 30

def self.send_message(message)
  return if message.nil? || message.empty?

  uuid = SecureRandom.uuid
  message_with_uuid = nil

  if Kafkr::Producer.configuration.is_json
    json_message = JSON.parse(message)
    json_message["uuid"] = uuid
    message_with_uuid = JSON.dump(json_message)
  end

  encrypted_message_with_uuid = Kafkr::Encryptor.new.encrypt(message_with_uuid)

  begin
    socket = TCPSocket.new(@configuration.host, @configuration.port)
    send_queued_messages(socket)
    socket.puts (encrypted_message_with_uuid)
  rescue Errno::ECONNREFUSED
    Kafkr.log "Connection refused. Queuing message: #{encrypted_message_with_uuid}"
    @configuration.message_queue.push(encrypted_message_with_uuid)
    save_queue_to_file
  rescue Errno::EPIPE
    Kafkr.log "Broken pipe error. Retrying connection..."
    retry_connection(encrypted_message_with_uuid)
  end

  uuid
end

.send_message_and_wait(message) ⇒ Object



60
61
62
63
64
65
66
# File 'lib/kafkr/producer.rb', line 60

def self.send_message_and_wait(message)
  Consumer.new.listen_for(message, method(:send_message)) do |received_message, sync_uid|
    if received_message.key? "reply" and received_message["reply"].dig("uuid") == sync_uid
      received_message["reply"].dig("payload")
    end
  end
end