Class: KafkaCommand::Client

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
app/models/kafka_command/client.rb

Constant Summary collapse

CLUSTER_METHOD_DELGATIONS =
%i(
  broker_pool
  delete_topic
  alter_topic
  describe_topic
  create_partitions_for
  resolve_offset
  resolve_offsets
  describe_group
  supports_api?
).freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(brokers:, **kwargs) ⇒ Client

Returns a new instance of Client.



25
26
27
28
# File 'app/models/kafka_command/client.rb', line 25

def initialize(brokers:, **kwargs)
  @client  = Kafka.new(brokers, **kwargs)
  @cluster = @client.cluster
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



21
22
23
# File 'app/models/kafka_command/client.rb', line 21

def client
  @client
end

#clusterObject (readonly)

Returns the value of attribute cluster.



21
22
23
# File 'app/models/kafka_command/client.rb', line 21

def cluster
  @cluster
end

Instance Method Details

#brokersObject



40
41
42
# File 'app/models/kafka_command/client.rb', line 40

def brokers
  @brokers ||= initialize_brokers
end

#connect_to_broker(host:, port:, broker_id:) ⇒ Object



73
74
75
# File 'app/models/kafka_command/client.rb', line 73

def connect_to_broker(host:, port:, broker_id:)
  Broker.new(broker_pool.connect(host, port, node_id: broker_id))
end

#fetch_metadata(topics: nil) ⇒ Object



60
61
62
# File 'app/models/kafka_command/client.rb', line 60

def (topics: nil)
  brokers.sample.(topics: topics)
end

#find_topic(topic_name) ⇒ Object



69
70
71
# File 'app/models/kafka_command/client.rb', line 69

def find_topic(topic_name)
  topics.find { |t| t.name == topic_name }
end

#get_group_coordinator(group_id:) ⇒ Object



64
65
66
67
# File 'app/models/kafka_command/client.rb', line 64

def get_group_coordinator(group_id:)
  broker = @cluster.get_group_coordinator(group_id: group_id)
  Broker.new(broker)
end

#groupsObject



44
45
46
# File 'app/models/kafka_command/client.rb', line 44

def groups
  @groups ||= initialize_groups
end

#refresh!Object



30
31
32
33
34
# File 'app/models/kafka_command/client.rb', line 30

def refresh!
  refresh_brokers!
  refresh_topics!
  refresh_groups!
end

#refresh_brokers!Object



56
57
58
# File 'app/models/kafka_command/client.rb', line 56

def refresh_brokers!
  @brokers = initialize_brokers
end

#refresh_groups!Object



48
49
50
# File 'app/models/kafka_command/client.rb', line 48

def refresh_groups!
  @groups = initialize_groups
end

#refresh_topics!Object



52
53
54
# File 'app/models/kafka_command/client.rb', line 52

def refresh_topics!
  @topics = initialize_topics
end

#topicsObject



36
37
38
# File 'app/models/kafka_command/client.rb', line 36

def topics
  @topics ||= initialize_topics
end