Class: Kafka::Consumer
- Inherits:
-
Object
show all
- Includes:
- Enumerable
- Defined in:
- lib/kafka/consumer.rb,
lib/kafka/consumer/message.rb,
lib/kafka/consumer/version.rb,
lib/kafka/consumer/partition_consumer.rb
Defined Under Namespace
Classes: Message, PartitionConsumer
Constant Summary
collapse
- VERSION =
"0.1.2"
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(name, subscription, zookeeper: nil, max_wait_ms: 200, initial_offset: :latest_offset, logger: nil) ⇒ Consumer
Returns a new instance of Consumer.
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
# File 'lib/kafka/consumer.rb', line 19
def initialize(name, subscription, zookeeper: nil, max_wait_ms: 200, initial_offset: :latest_offset, logger: nil)
raise ArgumentError, "The consumer's name cannot be empty" if name.nil? || name.empty?
raise ArgumentError, "You have to specify a zookeeper connection string" if zookeeper.nil? || zookeeper.empty?
@name = name
@max_wait_ms, @initial_offset = max_wait_ms, initial_offset
@logger = logger || Logger.new($stdout)
@cluster = Kazoo::Cluster.new(zookeeper)
@group = Kazoo::Consumergroup.new(@cluster, name)
@group.create unless @group.exists?
@instance = @group.instantiate(subscription: Kazoo::Subscription.build(subscription)).register
end
|
Instance Attribute Details
#cluster ⇒ Object
Returns the value of attribute cluster.
14
15
16
|
# File 'lib/kafka/consumer.rb', line 14
def cluster
@cluster
end
|
#group ⇒ Object
Returns the value of attribute group.
14
15
16
|
# File 'lib/kafka/consumer.rb', line 14
def group
@group
end
|
#initial_offset ⇒ Object
Returns the value of attribute initial_offset.
14
15
16
|
# File 'lib/kafka/consumer.rb', line 14
def initial_offset
@initial_offset
end
|
#instance ⇒ Object
Returns the value of attribute instance.
14
15
16
|
# File 'lib/kafka/consumer.rb', line 14
def instance
@instance
end
|
#logger ⇒ Object
Returns the value of attribute logger.
14
15
16
|
# File 'lib/kafka/consumer.rb', line 14
def logger
@logger
end
|
#max_wait_ms ⇒ Object
Returns the value of attribute max_wait_ms.
14
15
16
|
# File 'lib/kafka/consumer.rb', line 14
def max_wait_ms
@max_wait_ms
end
|
#subscription ⇒ Object
Returns the value of attribute subscription.
14
15
16
|
# File 'lib/kafka/consumer.rb', line 14
def subscription
@subscription
end
|
Class Method Details
.distribute_partitions(instances, partitions) ⇒ Object
94
95
96
97
98
99
100
101
102
|
# File 'lib/kafka/consumer.rb', line 94
def self.distribute_partitions(instances, partitions)
return {} if instances.empty?
partitions_per_instance = partitions.length.to_f / instances.length.to_f
partitions.group_by.with_index do |partition, index|
instance_index = index.fdiv(partitions_per_instance).floor
instances[instance_index]
end
end
|
Instance Method Details
#dead? ⇒ Boolean
75
76
77
|
# File 'lib/kafka/consumer.rb', line 75
def dead?
@consumer_manager.status == false
end
|
#each(&block) ⇒ Object
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
# File 'lib/kafka/consumer.rb', line 79
def each(&block)
mutex = Mutex.new
handler = lambda do |message|
mutex.synchronize { block.call(message) }
end
@consumer_manager = Thread.new do
Thread.current.abort_on_exception = true
manage_partition_consumers(handler)
end
wait
end
|
#id ⇒ Object
38
39
40
|
# File 'lib/kafka/consumer.rb', line 38
def id
instance.id
end
|
#interrupt ⇒ Object
50
51
52
53
54
55
56
57
58
59
60
|
# File 'lib/kafka/consumer.rb', line 50
def interrupt
Thread.new do
Thread.current.abort_on_exception = true
logger.info "Stopping partition consumers..."
@consumer_manager[:interrupted] = true
continue
end
end
|
#interrupted? ⇒ Boolean
62
63
64
|
# File 'lib/kafka/consumer.rb', line 62
def interrupted?
@consumer_manager[:interrupted]
end
|
#name ⇒ Object
34
35
36
|
# File 'lib/kafka/consumer.rb', line 34
def name
group.name
end
|
#partitions ⇒ Object
46
47
48
|
# File 'lib/kafka/consumer.rb', line 46
def partitions
subscription.partitions(@cluster).sort_by { |partition| [partition.preferred_leader.id, partition.topic.name, partition.id] }
end
|
#stop ⇒ Object
66
67
68
69
|
# File 'lib/kafka/consumer.rb', line 66
def stop
interrupt
wait
end
|
#wait ⇒ Object
71
72
73
|
# File 'lib/kafka/consumer.rb', line 71
def wait
@consumer_manager.join if @consumer_manager.alive?
end
|