Class: KafkaCommand::Topic

Inherits:
Object
  • Object
show all
Defined in:
app/models/kafka_command/topic.rb

Defined Under Namespace

Classes: DeletionError

Constant Summary collapse

API_TIMEOUT =
10
CONSUMER_OFFSET_TOPIC =
'__consumer_offsets'
DEFAULT_MAX_MESSAGE_BYTES =
1000012
DEFAULT_RETENTION_MS =
604800000
DEFAULT_RETENTION_BYTES =
-1
TOPIC_CONFIGS =
%w[
  max.message.bytes
  retention.bytes
  retention.ms
].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic_metadata, client) ⇒ Topic

Returns a new instance of Topic.



27
28
29
30
31
# File 'app/models/kafka_command/topic.rb', line 27

def initialize(, client)
  @client         = client
   = 
  
end

Instance Attribute Details

#max_message_bytesObject (readonly)

Returns the value of attribute max_message_bytes.



5
6
7
# File 'app/models/kafka_command/topic.rb', line 5

def max_message_bytes
  @max_message_bytes
end

#nameObject (readonly) Also known as: id

Returns the value of attribute name.



5
6
7
# File 'app/models/kafka_command/topic.rb', line 5

def name
  @name
end

#partitionsObject (readonly)

Returns the value of attribute partitions.



5
6
7
# File 'app/models/kafka_command/topic.rb', line 5

def partitions
  @partitions
end

#replication_factorObject (readonly)

Returns the value of attribute replication_factor.



5
6
7
# File 'app/models/kafka_command/topic.rb', line 5

def replication_factor
  @replication_factor
end

#retention_bytesObject (readonly)

Returns the value of attribute retention_bytes.



5
6
7
# File 'app/models/kafka_command/topic.rb', line 5

def retention_bytes
  @retention_bytes
end

#retention_msObject (readonly)

Returns the value of attribute retention_ms.



5
6
7
# File 'app/models/kafka_command/topic.rb', line 5

def retention_ms
  @retention_ms
end

Instance Method Details

#==(other) ⇒ Object



121
122
123
# File 'app/models/kafka_command/topic.rb', line 121

def ==(other)
  @name == other.name
end

#as_json(include_config: false, **kwargs) ⇒ Object

Needs arguments to be compatible with rails as_json calls



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'app/models/kafka_command/topic.rb', line 103

def as_json(include_config: false, **kwargs)
  json = {
    name: @name,
    replication_factor: @replication_factor,
    partitions: @partitions.sort_by(&:partition_id).map(&:as_json)
  }

  if include_config
    json[:config] = {
      max_message_bytes: max_message_bytes,
      retention_ms: retention_ms,
      retention_bytes: retention_bytes
    }
  end

  json
end

#brokers_spreadObject



33
34
35
# File 'app/models/kafka_command/topic.rb', line 33

def brokers_spread
  ((replication_factor.to_f / @client.brokers.count.to_f) * 100).round
end

#consumer_offset_topic?Boolean

Returns:

  • (Boolean)


92
93
94
# File 'app/models/kafka_command/topic.rb', line 92

def consumer_offset_topic?
  name == CONSUMER_OFFSET_TOPIC
end

#destroyObject



37
38
39
40
41
42
43
# File 'app/models/kafka_command/topic.rb', line 37

def destroy
  if name == CONSUMER_OFFSET_TOPIC
    raise DeletionError, "Cannot delete the #{CONSUMER_OFFSET_TOPIC} topic"
  end

  @client.delete_topic(@name, timeout: API_TIMEOUT)
end

#groupsObject



96
97
98
99
100
# File 'app/models/kafka_command/topic.rb', line 96

def groups
  @client.groups.select do |g|
    g.consumed_topics.include?(self)
  end
end

#offset_for(partition) ⇒ Object



74
75
76
77
78
79
80
81
# File 'app/models/kafka_command/topic.rb', line 74

def offset_for(partition)
  tries = 0
  @client.resolve_offset(@name, partition.partition_id, :latest)
rescue Kafka::UnknownTopicOrPartition
  raise if tries >= 3
  tries += 1
  retry
end

#offset_sumObject



88
89
90
# File 'app/models/kafka_command/topic.rb', line 88

def offset_sum
  offsets.values.reduce(:+)
end

#offsets(partition_ids = nil) ⇒ Object



83
84
85
86
# File 'app/models/kafka_command/topic.rb', line 83

def offsets(partition_ids = nil)
  partition_ids ||= @partitions.map(&:partition_id)
  @client.resolve_offsets(@name, partition_ids, :latest)
end

#refresh!Object



69
70
71
72
# File 'app/models/kafka_command/topic.rb', line 69

def refresh!
   = @client.(topics: [@name]).topics.first
  
end

#set_configs!(max_message_bytes: nil, retention_ms: nil, retention_bytes: nil) ⇒ Object



45
46
47
48
49
50
51
52
53
# File 'app/models/kafka_command/topic.rb', line 45

def set_configs!(max_message_bytes: nil, retention_ms: nil, retention_bytes: nil)
  config = {}
  config['max.message.bytes'] = max_message_bytes if max_message_bytes
  config['retention.ms']      = retention_ms      if retention_ms
  config['retention.bytes']   = retention_bytes   if retention_bytes

  @client.alter_topic(@name, config) unless config.empty?
  refresh!
end

#set_partitions!(num_partitions) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'app/models/kafka_command/topic.rb', line 55

def set_partitions!(num_partitions)
  unless @client.supports_api?(Kafka::Protocol::CREATE_PARTITIONS_API)
    raise UnsupportedApiError, 'This version of Kafka does not support the create partitions API.'
  end

  @client.create_partitions_for(
    @name,
    num_partitions: num_partitions,
    timeout: API_TIMEOUT
  )

  refresh!
end

#topic_configsObject



125
126
127
# File 'app/models/kafka_command/topic.rb', line 125

def topic_configs
  @topic_configs ||= describe
end