Class: SimpleKafkaConsumer::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/simple_kafka_consumer/consumer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(kafka_servers, zookeeper_servers, logger: nil) ⇒ Consumer

Returns a new instance of Consumer.



5
6
7
8
9
10
11
12
13
# File 'lib/simple_kafka_consumer/consumer.rb', line 5

def initialize(kafka_servers, zookeeper_servers, logger: nil)
  @consumer = Poseidon::ConsumerGroup.new(
    group_name, 
    kafka_servers, 
    zookeeper_servers, 
    topic_name
  )
  @logger = logger
end

Instance Attribute Details

#consumerObject (readonly)

Returns the value of attribute consumer.



4
5
6
# File 'lib/simple_kafka_consumer/consumer.rb', line 4

def consumer
  @consumer
end

#loggerObject (readonly)

Returns the value of attribute logger.



4
5
6
# File 'lib/simple_kafka_consumer/consumer.rb', line 4

def logger
  @logger
end

Instance Method Details

#runObject



15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/simple_kafka_consumer/consumer.rb', line 15

def run
  debug "partitions: #{consumer.partitions}"
  debug "claimed: #{consumer.claimed}"
  consumer.fetch_loop do |partition, bulk|
    bulk.each do |message|
      consume(parse(message))
    end
  end
rescue ZK::Exceptions::OperationTimeOut => e
  log e.message
  retry
end