Class: KafkaCommand::Broker
- Inherits:
-
Object
- Object
- KafkaCommand::Broker
- Extended by:
- Forwardable
- Defined in:
- app/models/kafka_command/broker.rb
Instance Attribute Summary collapse
-
#broker ⇒ Object
readonly
Returns the value of attribute broker.
Instance Method Summary collapse
- #as_json ⇒ Object
- #connected? ⇒ Boolean
- #host_with_port ⇒ Object
-
#initialize(broker) ⇒ Broker
constructor
A new instance of Broker.
-
#offsets_for(group, topic) ⇒ Object
needs to be the group coordinator to work.
Constructor Details
#initialize(broker) ⇒ Broker
Returns a new instance of Broker.
14 15 16 |
# File 'app/models/kafka_command/broker.rb', line 14 def initialize(broker) @broker = broker end |
Instance Attribute Details
#broker ⇒ Object (readonly)
Returns the value of attribute broker.
8 9 10 |
# File 'app/models/kafka_command/broker.rb', line 8 def broker @broker end |
Instance Method Details
#as_json ⇒ Object
22 23 24 25 26 27 |
# File 'app/models/kafka_command/broker.rb', line 22 def as_json(*) { id: node_id, host: host_with_port } end |
#connected? ⇒ Boolean
40 41 42 43 44 45 |
# File 'app/models/kafka_command/broker.rb', line 40 def connected? @broker.api_versions # simple request to check connections true rescue Kafka::ConnectionError false end |
#host_with_port ⇒ Object
18 19 20 |
# File 'app/models/kafka_command/broker.rb', line 18 def host_with_port "#{host}:#{port}" end |
#offsets_for(group, topic) ⇒ Object
needs to be the group coordinator to work
30 31 32 33 34 35 36 37 38 |
# File 'app/models/kafka_command/broker.rb', line 30 def offsets_for(group, topic) offsets = @broker.fetch_offsets( group_id: group.group_id, topics: { topic.name => topic.partitions.map(&:partition_id) } ).topics[topic.name] offsets.keys.each { |partition_id| offsets[partition_id] = offsets[partition_id].offset } offsets end |