Class: Kafka::RoundRobinAssignmentStrategy

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/round_robin_assignment_strategy.rb

Overview

A round robin assignment strategy inpired on the original java client round robin assignor. It's capable of handling identical as well as different topic subscriptions accross the same consumer group.

Instance Method Summary collapse

Instance Method Details

#call(cluster:, members:, partitions:) ⇒ Hash<String, Array<Kafka::ConsumerGroup::Assignor::Partition>] a hash mapping member ids to partitions.

Assign the topic partitions to the group members.

Parameters:

Returns:

[View source]

21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/kafka/round_robin_assignment_strategy.rb', line 21

def call(cluster:, members:, partitions:)
  partitions_per_member = Hash.new {|h, k| h[k] = [] }
  relevant_partitions = valid_sorted_partitions(members, partitions)
  members_ids = members.keys
  iterator = (0...members.size).cycle
  idx = iterator.next

  relevant_partitions.each do |partition|
    topic = partition.topic

    while !members[members_ids[idx]].topics.include?(topic)
      idx = iterator.next
    end

    partitions_per_member[members_ids[idx]] << partition
    idx = iterator.next
  end

  partitions_per_member
end

#protocol_nameObject

[View source]

8
9
10
# File 'lib/kafka/round_robin_assignment_strategy.rb', line 8

def protocol_name
  "roundrobin"
end

#valid_sorted_partitions(members, partitions) ⇒ Object

[View source]

42
43
44
45
46
47
48
49
50
# File 'lib/kafka/round_robin_assignment_strategy.rb', line 42

def valid_sorted_partitions(members, partitions)
  subscribed_topics = members.map do |id, |
     && .topics
  end.flatten.compact

  partitions
    .select { |partition| subscribed_topics.include?(partition.topic) }
    .sort_by { |partition| partition.topic }
end