Class: Kafka::Compressor

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/compressor.rb

Overview

Compresses message sets using a specified codec.

A message set is only compressed if its size meets the defined threshold.

Instrumentation

Whenever a message set is compressed, the notification compress.compressor.kafka will be emitted with the following payload:

  • message_count – the number of messages in the message set.
  • uncompressed_bytesize – the byte size of the original data.
  • compressed_bytesize – the byte size of the compressed data.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(codec_name: nil, threshold: 1, instrumenter:) ⇒ Compressor



26
27
28
29
30
31
32
# File 'lib/kafka/compressor.rb', line 26

def initialize(codec_name: nil, threshold: 1, instrumenter:)
  # Codec may be nil, in which case we won't compress.
  @codec = codec_name && Compression.find_codec(codec_name)

  @threshold = threshold
  @instrumenter = instrumenter
end

Instance Attribute Details

#codecObject (readonly)

Returns the value of attribute codec.



21
22
23
# File 'lib/kafka/compressor.rb', line 21

def codec
  @codec
end

Instance Method Details

#compress(record_batch, offset: -1)) ⇒ Protocol::RecordBatch



37
38
39
40
41
42
43
44
# File 'lib/kafka/compressor.rb', line 37

def compress(record_batch, offset: -1)
  if record_batch.is_a?(Protocol::RecordBatch)
    compress_record_batch(record_batch)
  else
    # Deprecated message set format
    compress_message_set(record_batch, offset)
  end
end