Class: Kafka::Message

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/message.rb

Overview

A message. The format of a message is as follows:

4 byte big-endian int: length of message in bytes (including the rest of

the header, but excluding the length field itself)

1 byte: “magic” identifier (format version number)

If the magic byte == 0, there is one more header field:

4 byte big-endian int: CRC32 checksum of the payload

If the magic byte == 1, there are two more header fields:

1 byte: “attributes” (flags for compression, codec etc) 4 byte big-endian int: CRC32 checksum of the payload

All following bytes are the payload.

Defined Under Namespace

Classes: MessageSet

Constant Summary collapse

MAGIC_IDENTIFIER_DEFAULT =
0
MAGIC_IDENTIFIER_COMPRESSION =
1
NO_COMPRESSION =
0
GZIP_COMPRESSION =
1
SNAPPY_COMPRESSION =
2
BASIC_MESSAGE_HEADER =
'NC'.freeze
VERSION_0_HEADER =
'N'.freeze
VERSION_1_HEADER =
'CN'.freeze
COMPRESSION_CODEC_MASK =
0x03

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(payload = nil, magic = MAGIC_IDENTIFIER_DEFAULT, checksum = nil) ⇒ Message

Returns a new instance of Message.

[View source]

47
48
49
50
51
52
# File 'lib/kafka/message.rb', line 47

def initialize(payload = nil, magic = MAGIC_IDENTIFIER_DEFAULT, checksum = nil)
  self.magic       = magic
  self.payload     = payload || ""
  self.checksum    = checksum || self.calculate_checksum
  @compression = NO_COMPRESSION
end

Instance Attribute Details

#checksumObject

Returns the value of attribute checksum.


45
46
47
# File 'lib/kafka/message.rb', line 45

def checksum
  @checksum
end

#magicObject

Returns the value of attribute magic.


45
46
47
# File 'lib/kafka/message.rb', line 45

def magic
  @magic
end

#payloadObject

Returns the value of attribute payload.


45
46
47
# File 'lib/kafka/message.rb', line 45

def payload
  @payload
end

Class Method Details

.ensure_snappy!Object

[View source]

137
138
139
140
141
142
143
# File 'lib/kafka/message.rb', line 137

def self.ensure_snappy!
  if Object.const_defined? "Snappy"
    yield
  else
    fail "Snappy not available!"
  end
end

.parse_from(data) ⇒ Object

Takes a byte string containing one or more messages; returns a MessageSet with the messages parsed from the string, and the number of bytes consumed from the string.

[View source]

65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/kafka/message.rb', line 65

def self.parse_from(data)
  messages = []
  bytes_processed = 0

  while bytes_processed <= data.length - 5 # 5 = size of BASIC_MESSAGE_HEADER
    message_size, magic = data[bytes_processed, 5].unpack(BASIC_MESSAGE_HEADER)
    break if bytes_processed + message_size + 4 > data.length # message is truncated

    case magic
    when MAGIC_IDENTIFIER_DEFAULT
      # |  0  |  1  |  2  |  3  |  4  |  5  |  6  |  7  |  8  |  9      ...
      # |                       |     |                       |
      # |      message_size     |magic|        checksum       | payload ...
      payload_size = message_size - 5 # 5 = sizeof(magic) + sizeof(checksum)
      checksum = data[bytes_processed + 5, 4].unpack(VERSION_0_HEADER).shift
      payload  = data[bytes_processed + 9, payload_size]
      messages << Kafka::Message.new(payload, magic, checksum)

    when MAGIC_IDENTIFIER_COMPRESSION
      # |  0  |  1  |  2  |  3  |  4  |  5  |  6  |  7  |  8  |  9  | 10      ...
      # |                       |     |     |                       |
      # |         size          |magic|attrs|        checksum       | payload ...
      payload_size = message_size - 6 # 6 = sizeof(magic) + sizeof(attrs) + sizeof(checksum)
      attributes, checksum = data[bytes_processed + 5, 5].unpack(VERSION_1_HEADER)
      payload = data[bytes_processed + 10, payload_size]

      case attributes & COMPRESSION_CODEC_MASK
      when NO_COMPRESSION # a single uncompressed message
        messages << Kafka::Message.new(payload, magic, checksum)
      when GZIP_COMPRESSION # a gzip-compressed message set -- parse recursively
        uncompressed = Zlib::GzipReader.new(StringIO.new(payload)).read
        message_set = parse_from(uncompressed)
        raise 'malformed compressed message' if message_set.size != uncompressed.size
        messages.concat(message_set.messages)
      when SNAPPY_COMPRESSION # a snappy-compresses message set -- parse recursively
        ensure_snappy! do
          uncompressed = Snappy::Reader.new(StringIO.new(payload)).read
          message_set = parse_from(uncompressed)
          raise 'malformed compressed message' if message_set.size != uncompressed.size
          messages.concat(message_set.messages)
        end
      else
        # https://cwiki.apache.org/confluence/display/KAFKA/Compression
        raise "Unsupported Kafka compression codec: #{attributes & COMPRESSION_CODEC_MASK}"
      end

    else
      raise "Unsupported Kafka message version: magic number #{magic}"
    end

    bytes_processed += message_size + 4 # 4 = sizeof(message_size)
  end

  MessageSet.new(bytes_processed, messages)
end

Instance Method Details

#calculate_checksumObject

[View source]

54
55
56
# File 'lib/kafka/message.rb', line 54

def calculate_checksum
  Zlib.crc32(self.payload)
end

#encode(compression = NO_COMPRESSION) ⇒ Object

[View source]

121
122
123
124
125
126
127
128
129
# File 'lib/kafka/message.rb', line 121

def encode(compression = NO_COMPRESSION)
  @compression = compression

  self.payload = asciify_payload
  self.payload = compress_payload if compression?

  data = magic_and_compression + [calculate_checksum].pack("N") + payload
  [data.length].pack("N") + data
end

#ensure_snappy!(&block) ⇒ Object

[View source]

145
146
147
# File 'lib/kafka/message.rb', line 145

def ensure_snappy! &block
  self.class.ensure_snappy! &block
end

#valid?Boolean

Returns:

  • (Boolean)
[View source]

58
59
60
# File 'lib/kafka/message.rb', line 58

def valid?
  self.checksum == calculate_checksum
end