Class: Kafkalogue::Log

Inherits:
Object
  • Object
show all
Defined in:
lib/kafkalogue/log.rb

Instance Method Summary collapse

Constructor Details

#initialize(brokers:, topic:) ⇒ Log

Returns a new instance of Log.



8
9
10
11
12
13
14
15
# File 'lib/kafkalogue/log.rb', line 8

def initialize(brokers:, topic:)
  @producer = Poseidon::Producer.new(brokers, PRODUCER_NAME, {
    compression_codec: :snappy
  })

  @topic = topic
  @buffer = []
end

Instance Method Details

#flushObject



25
26
27
28
29
30
31
32
33
# File 'lib/kafkalogue/log.rb', line 25

def flush
  instrument :flush do
    @producer.send_messages(@buffer)
  end

  @buffer.clear
rescue Poseidon::Errors::UnableToFetchMetadata, SocketError
  # Couldn't write to Kafka, so let's just buffer the messages for now.
end

#write(data, key:) ⇒ Object



17
18
19
20
21
22
23
# File 'lib/kafkalogue/log.rb', line 17

def write(data, key:)
  if @buffer.size < MAX_BUFFER_SIZE
    @buffer << Poseidon::MessageToSend.new(@topic, data, key.to_s)
  else
    instrument :buffer_overflow
  end
end