Class: Krump::KafkaConsumer

Inherits:
Object
  • Object
show all
Defined in:
lib/krump/kafka_consumer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(brokers, topic, partition, offset) ⇒ KafkaConsumer

Returns a new instance of KafkaConsumer.



9
10
11
12
13
14
15
16
17
# File 'lib/krump/kafka_consumer.rb', line 9

def initialize(brokers, topic, partition, offset)
  @topic = topic
  @partition = partition
  @offset = offset
  @broker = find_broker_for_partition_or_fail(brokers.clone)
  @consumer = init_consumer
  @messages_read = 0
  @last_fetch_size = 0
end

Instance Attribute Details

#brokerObject (readonly)

Returns the value of attribute broker.



7
8
9
# File 'lib/krump/kafka_consumer.rb', line 7

def broker
  @broker
end

#consumerObject (readonly)

Returns the value of attribute consumer.



7
8
9
# File 'lib/krump/kafka_consumer.rb', line 7

def consumer
  @consumer
end

#last_fetch_sizeObject (readonly)

Returns the value of attribute last_fetch_size.



7
8
9
# File 'lib/krump/kafka_consumer.rb', line 7

def last_fetch_size
  @last_fetch_size
end

#messages_readObject

Returns the value of attribute messages_read.



6
7
8
# File 'lib/krump/kafka_consumer.rb', line 6

def messages_read
  @messages_read
end

#offsetObject (readonly)

Returns the value of attribute offset.



7
8
9
# File 'lib/krump/kafka_consumer.rb', line 7

def offset
  @offset
end

#partitionObject (readonly)

Returns the value of attribute partition.



7
8
9
# File 'lib/krump/kafka_consumer.rb', line 7

def partition
  @partition
end

#topicObject (readonly)

Returns the value of attribute topic.



7
8
9
# File 'lib/krump/kafka_consumer.rb', line 7

def topic
  @topic
end

Instance Method Details

#fetchObject



19
20
21
22
23
# File 'lib/krump/kafka_consumer.rb', line 19

def fetch
  messages = @consumer.fetch
  @last_fetch_size = messages.size
  messages
end