Class: Kafka::Consumer
- Inherits:
-
Object
show all
- Includes:
- Enumerable
- Defined in:
- lib/kafka/consumer.rb,
lib/kafka/consumer/message.rb,
lib/kafka/consumer/version.rb,
lib/kafka/consumer/partition_consumer.rb
Defined Under Namespace
Classes: Message, PartitionConsumer
Constant Summary
collapse
- BACKPRESSURE_MESSAGE_LIMIT =
1000
- VERSION =
"0.1.1"
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(name, subscription, zookeeper: [], max_wait_ms: 200, initial_offset: :latest_offset, logger: nil) ⇒ Consumer
Returns a new instance of Consumer.
21
22
23
24
25
26
27
28
29
30
31
32
|
# File 'lib/kafka/consumer.rb', line 21
def initialize(name, subscription, zookeeper: [], max_wait_ms: 200, initial_offset: :latest_offset, logger: nil)
@name, @subscription = name, subscription
@max_wait_ms, @initial_offset = max_wait_ms, initial_offset
@logger = logger || Logger.new($stdout)
@cluster = Kazoo::Cluster.new(zookeeper)
@group = Kazoo::Consumergroup.new(@cluster, name)
@group.create unless @group.exists?
@instance = @group.instantiate
@instance.register(topics)
end
|
Instance Attribute Details
#cluster ⇒ Object
Returns the value of attribute cluster.
16
17
18
|
# File 'lib/kafka/consumer.rb', line 16
def cluster
@cluster
end
|
#group ⇒ Object
Returns the value of attribute group.
16
17
18
|
# File 'lib/kafka/consumer.rb', line 16
def group
@group
end
|
#initial_offset ⇒ Object
Returns the value of attribute initial_offset.
16
17
18
|
# File 'lib/kafka/consumer.rb', line 16
def initial_offset
@initial_offset
end
|
#instance ⇒ Object
Returns the value of attribute instance.
16
17
18
|
# File 'lib/kafka/consumer.rb', line 16
def instance
@instance
end
|
#logger ⇒ Object
Returns the value of attribute logger.
16
17
18
|
# File 'lib/kafka/consumer.rb', line 16
def logger
@logger
end
|
#max_wait_ms ⇒ Object
Returns the value of attribute max_wait_ms.
16
17
18
|
# File 'lib/kafka/consumer.rb', line 16
def max_wait_ms
@max_wait_ms
end
|
#subscription ⇒ Object
Returns the value of attribute subscription.
16
17
18
|
# File 'lib/kafka/consumer.rb', line 16
def subscription
@subscription
end
|
Class Method Details
.distribute_partitions(instances, partitions) ⇒ Object
99
100
101
102
103
104
105
106
107
|
# File 'lib/kafka/consumer.rb', line 99
def self.distribute_partitions(instances, partitions)
return {} if instances.empty?
partitions_per_instance = partitions.length.to_f / instances.length.to_f
partitions.group_by.with_index do |partition, index|
instance_index = index.fdiv(partitions_per_instance).floor
instances[instance_index]
end
end
|
Instance Method Details
#dead? ⇒ Boolean
78
79
80
|
# File 'lib/kafka/consumer.rb', line 78
def dead?
@consumer_manager.status == false
end
|
#each(&block) ⇒ Object
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
# File 'lib/kafka/consumer.rb', line 82
def each(&block)
mutex = Mutex.new
handler = lambda do |message|
mutex.synchronize do
block.call(message)
end
end
@consumer_manager = Thread.new do
Thread.current.abort_on_exception = true
manage_partition_consumers(handler)
end
wait
end
|
#id ⇒ Object
38
39
40
|
# File 'lib/kafka/consumer.rb', line 38
def id
instance.id
end
|
#interrupt ⇒ Object
53
54
55
56
57
58
59
60
61
62
63
|
# File 'lib/kafka/consumer.rb', line 53
def interrupt
Thread.new do
Thread.current.abort_on_exception = true
logger.info "Stopping partition consumers..."
@consumer_manager[:interrupted] = true
continue
end
end
|
#interrupted? ⇒ Boolean
65
66
67
|
# File 'lib/kafka/consumer.rb', line 65
def interrupted?
@consumer_manager[:interrupted]
end
|
#name ⇒ Object
34
35
36
|
# File 'lib/kafka/consumer.rb', line 34
def name
group.name
end
|
#partitions ⇒ Object
49
50
51
|
# File 'lib/kafka/consumer.rb', line 49
def partitions
topics.flat_map(&:partitions).sort_by { |partition| [partition.leader.id, partition.topic.name, partition.id] }
end
|
#stop ⇒ Object
69
70
71
72
|
# File 'lib/kafka/consumer.rb', line 69
def stop
interrupt
wait
end
|
#topics ⇒ Object
42
43
44
45
46
47
|
# File 'lib/kafka/consumer.rb', line 42
def topics
@topics ||= begin
topic_names = Array(subscription)
topic_names.map { |topic_name| cluster.topics.fetch(topic_name) }
end
end
|
#wait ⇒ Object
74
75
76
|
# File 'lib/kafka/consumer.rb', line 74
def wait
@consumer_manager.join if @consumer_manager.alive?
end
|