Class: Kafka::ConsumerGroup::Assignor
- Inherits:
-
Object
- Object
- Kafka::ConsumerGroup::Assignor
- Defined in:
- lib/kafka/consumer_group/assignor.rb
Overview
A consumer group partition assignor
Defined Under Namespace
Classes: Partition
Instance Method Summary collapse
-
#assign(members:, topics:) ⇒ Hash<String, Kafka::Protocol::MemberAssignment>
Assign the topic partitions to the group members.
-
#initialize(cluster:, strategy:) ⇒ Assignor
constructor
A new instance of Assignor.
- #protocol_name ⇒ Object
- #user_data ⇒ Object
Constructor Details
permalink #initialize(cluster:, strategy:) ⇒ Assignor
Returns a new instance of Assignor.
15 16 17 18 |
# File 'lib/kafka/consumer_group/assignor.rb', line 15 def initialize(cluster:, strategy:) @cluster = cluster @strategy = strategy end |
Instance Method Details
permalink #assign(members:, topics:) ⇒ Hash<String, Kafka::Protocol::MemberAssignment>
Assign the topic partitions to the group members.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/kafka/consumer_group/assignor.rb', line 35 def assign(members:, topics:) topic_partitions = topics.flat_map do |topic| begin partition_ids = @cluster.partitions_for(topic).map(&:partition_id) rescue UnknownTopicOrPartition raise UnknownTopicOrPartition, "unknown topic #{topic}" end partition_ids.map {|partition_id| Partition.new(topic, partition_id) } end group_assignment = {} members.each_key do |member_id| group_assignment[member_id] = Protocol::MemberAssignment.new end @strategy.call(cluster: @cluster, members: members, partitions: topic_partitions).each do |member_id, partitions| Array(partitions).each do |partition| group_assignment[member_id].assign(partition.topic, [partition.partition_id]) end end group_assignment rescue Kafka::LeaderNotAvailable sleep 1 retry end |
permalink #protocol_name ⇒ Object
[View source]
20 21 22 |
# File 'lib/kafka/consumer_group/assignor.rb', line 20 def protocol_name @strategy.respond_to?(:protocol_name) ? @strategy.protocol_name : @strategy.class.to_s end |
permalink #user_data ⇒ Object
[View source]
24 25 26 |
# File 'lib/kafka/consumer_group/assignor.rb', line 24 def user_data @strategy.user_data if @strategy.respond_to?(:user_data) end |