Class: KafkaCommand::Cluster

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
app/models/kafka_command/cluster.rb

Constant Summary collapse

DEFAULT_PROTOCOL =
'PLAINTEXT'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name:, seed_brokers:, description: nil, protocol: DEFAULT_PROTOCOL, sasl_scram_username: nil, sasl_scram_password: nil, ssl_ca_cert_file_path: nil, ssl_client_cert_file_path: nil, ssl_client_cert_key_file_path: nil, version: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, connect_timeout: nil, socket_timeout: nil) ⇒ Cluster

Returns a new instance of Cluster.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'app/models/kafka_command/cluster.rb', line 24

def initialize(name:, seed_brokers:, description: nil, protocol: DEFAULT_PROTOCOL,
               sasl_scram_username: nil, sasl_scram_password: nil, ssl_ca_cert_file_path: nil,
               ssl_client_cert_file_path: nil, ssl_client_cert_key_file_path: nil, version: nil,
               ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, connect_timeout: nil,
               socket_timeout: nil
              )
  @name = name
  @seed_brokers = seed_brokers
  @description = description
  @sasl_scram_username = sasl_scram_username
  @sasl_scram_password = sasl_scram_password
  @ssl_ca_cert = ssl_ca_cert
  @ssl_ca_cert_file_path = ssl_ca_cert_file_path
  @ssl_client_cert = ssl_client_cert
  @ssl_client_cert_file_path = ssl_client_cert_file_path
  @ssl_client_cert_key = ssl_client_cert_key
  @ssl_client_cert_key_file_path = ssl_client_cert_key_file_path
  @version = version
  @connect_timeout = connect_timeout
  @socket_timeout = socket_timeout
  @client = initialize_client
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



10
11
12
# File 'app/models/kafka_command/cluster.rb', line 10

def client
  @client
end

#connect_timeoutObject (readonly)

Returns the value of attribute connect_timeout.



10
11
12
# File 'app/models/kafka_command/cluster.rb', line 10

def connect_timeout
  @connect_timeout
end

#descriptionObject (readonly)

Returns the value of attribute description.



10
11
12
# File 'app/models/kafka_command/cluster.rb', line 10

def description
  @description
end

#nameObject (readonly) Also known as: id

Returns the value of attribute name.



10
11
12
# File 'app/models/kafka_command/cluster.rb', line 10

def name
  @name
end

#protocolObject (readonly)

Returns the value of attribute protocol.



10
11
12
# File 'app/models/kafka_command/cluster.rb', line 10

def protocol
  @protocol
end

#sasl_scram_passwordObject (readonly)

Returns the value of attribute sasl_scram_password.



10
11
12
# File 'app/models/kafka_command/cluster.rb', line 10

def sasl_scram_password
  @sasl_scram_password
end

#sasl_scram_usernameObject (readonly)

Returns the value of attribute sasl_scram_username.



10
11
12
# File 'app/models/kafka_command/cluster.rb', line 10

def sasl_scram_username
  @sasl_scram_username
end

#socket_timeoutObject (readonly)

Returns the value of attribute socket_timeout.



10
11
12
# File 'app/models/kafka_command/cluster.rb', line 10

def socket_timeout
  @socket_timeout
end

#versionObject (readonly)

Returns the value of attribute version.



10
11
12
# File 'app/models/kafka_command/cluster.rb', line 10

def version
  @version
end

Class Method Details

.allObject



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'app/models/kafka_command/cluster.rb', line 95

def self.all
  KafkaCommand.config.clusters.map do |name, cluster_info|
    new(
      name: name,
      seed_brokers: cluster_info['seed_brokers'],
      protocol: cluster_info['protocol'],
      description: cluster_info['description'],
      connect_timeout: cluster_info['connect_timeout'],
      socket_timeout: cluster_info['socket_timeout'],
      sasl_scram_username: cluster_info['sasl_scram_username'],
      sasl_scram_password: cluster_info['sasl_scram_password'],
      ssl_ca_cert: cluster_info['ssl_ca_cert'],
      ssl_ca_cert_file_path: cluster_info['ssl_ca_cert_file_path'],
      ssl_client_cert: cluster_info['ssl_client_cert'],
      ssl_client_cert_file_path: cluster_info['ssl_client_cert_file_path'],
      ssl_client_cert_key: cluster_info['ssl_client_cert_key'],
      ssl_client_cert_key_file_path: cluster_info['ssl_client_cert_key_file_path'],
    )
  end
end

.countObject



91
92
93
# File 'app/models/kafka_command/cluster.rb', line 91

def self.count
  all.count
end

.find(cluster_name) ⇒ Object



83
84
85
# File 'app/models/kafka_command/cluster.rb', line 83

def self.find(cluster_name)
  all.find { |c| c.name == cluster_name }
end

.none?Boolean

Returns:

  • (Boolean)


87
88
89
# File 'app/models/kafka_command/cluster.rb', line 87

def self.none?
  all.none?
end

Instance Method Details

#==(other) ⇒ Object



79
80
81
# File 'app/models/kafka_command/cluster.rb', line 79

def ==(other)
  name == other.name
end

#connected?Boolean

Returns:

  • (Boolean)


47
48
49
50
# File 'app/models/kafka_command/cluster.rb', line 47

def connected?
  # Tried using all?(&:connected?) here, but was getting some weird behavior with the views
  brokers.map(&:connected?).all?
end

#create_topic(name, **kwargs) ⇒ Object



52
53
54
55
56
57
58
59
60
61
# File 'app/models/kafka_command/cluster.rb', line 52

def create_topic(name, **kwargs)
  client.create_topic(name, **kwargs)

  # Give the cluster time to know about the topic
  3.times do
    client.refresh_topics!
    topic = topics.find { |t| t.name == name }
    return topic if topic
  end
end

#sasl?Boolean

Returns:

  • (Boolean)


75
76
77
# File 'app/models/kafka_command/cluster.rb', line 75

def sasl?
  sasl_scram_username.present? && sasl_scram_password.present?
end

#ssl?Boolean

Returns:

  • (Boolean)


71
72
73
# File 'app/models/kafka_command/cluster.rb', line 71

def ssl?
  get_ssl_ca_cert.present?
end

#to_humanObject



63
64
65
# File 'app/models/kafka_command/cluster.rb', line 63

def to_human
  name.humanize.capitalize
end

#to_sObject



67
68
69
# File 'app/models/kafka_command/cluster.rb', line 67

def to_s
  name
end