Class: KafkaCommand::ConsumerGroup
- Inherits:
-
Object
- Object
- KafkaCommand::ConsumerGroup
- Defined in:
- app/models/kafka_command/consumer_group.rb
Instance Attribute Summary collapse
-
#group_id ⇒ Object
readonly
Returns the value of attribute group_id.
Instance Method Summary collapse
- #as_json ⇒ Object
- #consumed_topics ⇒ Object
- #coordinator ⇒ Object
- #empty? ⇒ Boolean
- #group_metadata ⇒ Object
-
#initialize(group_id, client) ⇒ ConsumerGroup
constructor
A new instance of ConsumerGroup.
- #members ⇒ Object
- #partitions_for(topic_name) ⇒ Object
- #refresh! ⇒ Object
- #stable? ⇒ Boolean
- #state ⇒ Object
- #total_lag_for(topic_name) ⇒ Object
Constructor Details
#initialize(group_id, client) ⇒ ConsumerGroup
Returns a new instance of ConsumerGroup.
7 8 9 10 |
# File 'app/models/kafka_command/consumer_group.rb', line 7 def initialize(group_id, client) @client = client @group_id = group_id end |
Instance Attribute Details
#group_id ⇒ Object (readonly)
Returns the value of attribute group_id.
5 6 7 |
# File 'app/models/kafka_command/consumer_group.rb', line 5 def group_id @group_id end |
Instance Method Details
#as_json ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'app/models/kafka_command/consumer_group.rb', line 43 def as_json(*) topics_json = consumed_topics.map do |topic| { name: topic.name, partitions: partitions_for(topic.name).map(&:as_json) } end { group_id: @group_id, state: state, topics: topics_json } end |
#consumed_topics ⇒ Object
58 59 60 61 62 63 64 |
# File 'app/models/kafka_command/consumer_group.rb', line 58 def consumed_topics topic_names = members.flat_map(&:topic_names).uniq @client.topics.select do |t| topic_names.include?(t.name) end end |
#coordinator ⇒ Object
66 67 68 |
# File 'app/models/kafka_command/consumer_group.rb', line 66 def coordinator @coordinator ||= @client.get_group_coordinator(group_id: @group_id) end |
#empty? ⇒ Boolean
20 21 22 |
# File 'app/models/kafka_command/consumer_group.rb', line 20 def empty? state.match?(/empty/i) || members.none? end |
#group_metadata ⇒ Object
70 71 72 |
# File 'app/models/kafka_command/consumer_group.rb', line 70 def @group_metadata ||= end |
#members ⇒ Object
78 79 80 81 82 |
# File 'app/models/kafka_command/consumer_group.rb', line 78 def members .members.map do |member| GroupMember.new(member) end end |
#partitions_for(topic_name) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'app/models/kafka_command/consumer_group.rb', line 24 def partitions_for(topic_name) topic = @client.find_topic(topic_name) partition_lag = lag_for(topic.name) topic.partitions.map do |p| ConsumerGroupPartition.new( lag: partition_lag[p.partition_id][:lag], offset: partition_lag[p.partition_id][:offset], group_id: @group_id, topic_name: topic.name, partition_id: p.partition_id ) end end |
#refresh! ⇒ Object
12 13 14 |
# File 'app/models/kafka_command/consumer_group.rb', line 12 def refresh! end |
#stable? ⇒ Boolean
16 17 18 |
# File 'app/models/kafka_command/consumer_group.rb', line 16 def stable? state.match?(/stable/i) end |
#state ⇒ Object
74 75 76 |
# File 'app/models/kafka_command/consumer_group.rb', line 74 def state .state end |
#total_lag_for(topic_name) ⇒ Object
39 40 41 |
# File 'app/models/kafka_command/consumer_group.rb', line 39 def total_lag_for(topic_name) lag_for(topic_name).values.map { |lag_hash| lag_hash[:lag] || 0 }.reduce(:+) end |