Class: KafkaCommand::Client
- Inherits:
-
Object
- Object
- KafkaCommand::Client
- Extended by:
- Forwardable
- Defined in:
- app/models/kafka_command/client.rb
Constant Summary collapse
- CLUSTER_METHOD_DELGATIONS =
%i( broker_pool delete_topic alter_topic describe_topic create_partitions_for resolve_offset resolve_offsets describe_group supports_api? ).freeze
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#cluster ⇒ Object
readonly
Returns the value of attribute cluster.
Instance Method Summary collapse
- #brokers ⇒ Object
- #connect_to_broker(host:, port:, broker_id:) ⇒ Object
- #fetch_metadata(topics: nil) ⇒ Object
- #find_topic(topic_name) ⇒ Object
- #get_group_coordinator(group_id:) ⇒ Object
- #groups ⇒ Object
-
#initialize(brokers:, **kwargs) ⇒ Client
constructor
A new instance of Client.
- #refresh! ⇒ Object
- #refresh_brokers! ⇒ Object
- #refresh_groups! ⇒ Object
- #refresh_topics! ⇒ Object
- #topics ⇒ Object
Constructor Details
#initialize(brokers:, **kwargs) ⇒ Client
Returns a new instance of Client.
25 26 27 28 |
# File 'app/models/kafka_command/client.rb', line 25 def initialize(brokers:, **kwargs) @client = Kafka.new(brokers, **kwargs) @cluster = @client.cluster end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
21 22 23 |
# File 'app/models/kafka_command/client.rb', line 21 def client @client end |
#cluster ⇒ Object (readonly)
Returns the value of attribute cluster.
21 22 23 |
# File 'app/models/kafka_command/client.rb', line 21 def cluster @cluster end |
Instance Method Details
#brokers ⇒ Object
40 41 42 |
# File 'app/models/kafka_command/client.rb', line 40 def brokers @brokers ||= initialize_brokers end |
#connect_to_broker(host:, port:, broker_id:) ⇒ Object
73 74 75 |
# File 'app/models/kafka_command/client.rb', line 73 def connect_to_broker(host:, port:, broker_id:) Broker.new(broker_pool.connect(host, port, node_id: broker_id)) end |
#fetch_metadata(topics: nil) ⇒ Object
60 61 62 |
# File 'app/models/kafka_command/client.rb', line 60 def (topics: nil) brokers.sample.(topics: topics) end |
#find_topic(topic_name) ⇒ Object
69 70 71 |
# File 'app/models/kafka_command/client.rb', line 69 def find_topic(topic_name) topics.find { |t| t.name == topic_name } end |
#get_group_coordinator(group_id:) ⇒ Object
64 65 66 67 |
# File 'app/models/kafka_command/client.rb', line 64 def get_group_coordinator(group_id:) broker = @cluster.get_group_coordinator(group_id: group_id) Broker.new(broker) end |
#groups ⇒ Object
44 45 46 |
# File 'app/models/kafka_command/client.rb', line 44 def groups @groups ||= initialize_groups end |
#refresh! ⇒ Object
30 31 32 33 34 |
# File 'app/models/kafka_command/client.rb', line 30 def refresh! refresh_brokers! refresh_topics! refresh_groups! end |
#refresh_brokers! ⇒ Object
56 57 58 |
# File 'app/models/kafka_command/client.rb', line 56 def refresh_brokers! @brokers = initialize_brokers end |
#refresh_groups! ⇒ Object
48 49 50 |
# File 'app/models/kafka_command/client.rb', line 48 def refresh_groups! @groups = initialize_groups end |
#refresh_topics! ⇒ Object
52 53 54 |
# File 'app/models/kafka_command/client.rb', line 52 def refresh_topics! @topics = initialize_topics end |
#topics ⇒ Object
36 37 38 |
# File 'app/models/kafka_command/client.rb', line 36 def topics @topics ||= initialize_topics end |