Class: KafkaSyrup::Protocol::MessageSet

Inherits:
Object
  • Object
show all
Includes:
Utils
Defined in:
lib/kafka_syrup/protocol/message_set.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Utils

#load_args, #log

Constructor Details

#initialize(*args) ⇒ MessageSet

Returns a new instance of MessageSet.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/kafka_syrup/protocol/message_set.rb', line 8

def initialize(*args)
  io, total_length, *_ = args
  if io.respond_to?(:read)
    total_length = 0 unless total_length.is_a?(Fixnum)

    read_length = 0

    self.messages = []

    while read_length < total_length && !io.eof?
      offset = E.read_int64(io)
      read_length += 8
      msg_length = E.read_int32(io)
      read_length += 4
      msg = Message.new(io, offset: offset, length: msg_length) rescue nil
      messages << msg
      read_length += msg_length
      yield(msg) if block_given? && msg
    end
  else
    load_args(defaults)
    load_args(*args)
  end
end

Instance Attribute Details

#messagesObject

Returns the value of attribute messages.



6
7
8
# File 'lib/kafka_syrup/protocol/message_set.rb', line 6

def messages
  @messages
end

Instance Method Details

#==(obj) ⇒ Object



48
49
50
# File 'lib/kafka_syrup/protocol/message_set.rb', line 48

def ==(obj)
  obj.encode == encode
end

#defaultsObject



33
34
35
# File 'lib/kafka_syrup/protocol/message_set.rb', line 33

def defaults
  { messages: [] }
end

#encodeObject



37
38
39
40
41
42
43
44
45
46
# File 'lib/kafka_syrup/protocol/message_set.rb', line 37

def encode
  messages.map{ |msg|
    encoded = msg.encode
    [
      E.write_int64(msg.offset.to_i),
      E.write_int32(encoded.length),
      encoded
    ].join
  }.join
end