Class: KafkaCommand::Topic
- Inherits:
-
Object
- Object
- KafkaCommand::Topic
- Defined in:
- app/models/kafka_command/topic.rb
Defined Under Namespace
Classes: DeletionError
Constant Summary collapse
- API_TIMEOUT =
10
- CONSUMER_OFFSET_TOPIC =
'__consumer_offsets'
- DEFAULT_MAX_MESSAGE_BYTES =
1000012
- DEFAULT_RETENTION_MS =
604800000
- DEFAULT_RETENTION_BYTES =
-1
- TOPIC_CONFIGS =
%w[ max.message.bytes retention.bytes retention.ms ].freeze
Instance Attribute Summary collapse
-
#max_message_bytes ⇒ Object
readonly
Returns the value of attribute max_message_bytes.
-
#name ⇒ Object
(also: #id)
readonly
Returns the value of attribute name.
-
#partitions ⇒ Object
readonly
Returns the value of attribute partitions.
-
#replication_factor ⇒ Object
readonly
Returns the value of attribute replication_factor.
-
#retention_bytes ⇒ Object
readonly
Returns the value of attribute retention_bytes.
-
#retention_ms ⇒ Object
readonly
Returns the value of attribute retention_ms.
Instance Method Summary collapse
- #==(other) ⇒ Object
-
#as_json(include_config: false, **kwargs) ⇒ Object
Needs arguments to be compatible with rails as_json calls.
- #brokers_spread ⇒ Object
- #consumer_offset_topic? ⇒ Boolean
- #destroy ⇒ Object
- #groups ⇒ Object
-
#initialize(topic_metadata, client) ⇒ Topic
constructor
A new instance of Topic.
- #offset_for(partition) ⇒ Object
- #offset_sum ⇒ Object
- #offsets(partition_ids = nil) ⇒ Object
- #refresh! ⇒ Object
- #set_configs!(max_message_bytes: nil, retention_ms: nil, retention_bytes: nil) ⇒ Object
- #set_partitions!(num_partitions) ⇒ Object
- #topic_configs ⇒ Object
Constructor Details
#initialize(topic_metadata, client) ⇒ Topic
Returns a new instance of Topic.
27 28 29 30 31 |
# File 'app/models/kafka_command/topic.rb', line 27 def initialize(, client) @client = client = end |
Instance Attribute Details
#max_message_bytes ⇒ Object (readonly)
Returns the value of attribute max_message_bytes.
5 6 7 |
# File 'app/models/kafka_command/topic.rb', line 5 def end |
#name ⇒ Object (readonly) Also known as: id
Returns the value of attribute name.
5 6 7 |
# File 'app/models/kafka_command/topic.rb', line 5 def name @name end |
#partitions ⇒ Object (readonly)
Returns the value of attribute partitions.
5 6 7 |
# File 'app/models/kafka_command/topic.rb', line 5 def partitions @partitions end |
#replication_factor ⇒ Object (readonly)
Returns the value of attribute replication_factor.
5 6 7 |
# File 'app/models/kafka_command/topic.rb', line 5 def replication_factor @replication_factor end |
#retention_bytes ⇒ Object (readonly)
Returns the value of attribute retention_bytes.
5 6 7 |
# File 'app/models/kafka_command/topic.rb', line 5 def retention_bytes @retention_bytes end |
#retention_ms ⇒ Object (readonly)
Returns the value of attribute retention_ms.
5 6 7 |
# File 'app/models/kafka_command/topic.rb', line 5 def retention_ms @retention_ms end |
Instance Method Details
#==(other) ⇒ Object
121 122 123 |
# File 'app/models/kafka_command/topic.rb', line 121 def ==(other) @name == other.name end |
#as_json(include_config: false, **kwargs) ⇒ Object
Needs arguments to be compatible with rails as_json calls
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'app/models/kafka_command/topic.rb', line 103 def as_json(include_config: false, **kwargs) json = { name: @name, replication_factor: @replication_factor, partitions: @partitions.sort_by(&:partition_id).map(&:as_json) } if include_config json[:config] = { max_message_bytes: , retention_ms: retention_ms, retention_bytes: retention_bytes } end json end |
#brokers_spread ⇒ Object
33 34 35 |
# File 'app/models/kafka_command/topic.rb', line 33 def brokers_spread ((replication_factor.to_f / @client.brokers.count.to_f) * 100).round end |
#consumer_offset_topic? ⇒ Boolean
92 93 94 |
# File 'app/models/kafka_command/topic.rb', line 92 def consumer_offset_topic? name == CONSUMER_OFFSET_TOPIC end |
#destroy ⇒ Object
37 38 39 40 41 42 43 |
# File 'app/models/kafka_command/topic.rb', line 37 def destroy if name == CONSUMER_OFFSET_TOPIC raise DeletionError, "Cannot delete the #{CONSUMER_OFFSET_TOPIC} topic" end @client.delete_topic(@name, timeout: API_TIMEOUT) end |
#groups ⇒ Object
96 97 98 99 100 |
# File 'app/models/kafka_command/topic.rb', line 96 def groups @client.groups.select do |g| g.consumed_topics.include?(self) end end |
#offset_for(partition) ⇒ Object
74 75 76 77 78 79 80 81 |
# File 'app/models/kafka_command/topic.rb', line 74 def offset_for(partition) tries = 0 @client.resolve_offset(@name, partition.partition_id, :latest) rescue Kafka::UnknownTopicOrPartition raise if tries >= 3 tries += 1 retry end |
#offset_sum ⇒ Object
88 89 90 |
# File 'app/models/kafka_command/topic.rb', line 88 def offset_sum offsets.values.reduce(:+) end |
#offsets(partition_ids = nil) ⇒ Object
83 84 85 86 |
# File 'app/models/kafka_command/topic.rb', line 83 def offsets(partition_ids = nil) partition_ids ||= @partitions.map(&:partition_id) @client.resolve_offsets(@name, partition_ids, :latest) end |
#refresh! ⇒ Object
69 70 71 72 |
# File 'app/models/kafka_command/topic.rb', line 69 def refresh! = @client.(topics: [@name]).topics.first end |
#set_configs!(max_message_bytes: nil, retention_ms: nil, retention_bytes: nil) ⇒ Object
45 46 47 48 49 50 51 52 53 |
# File 'app/models/kafka_command/topic.rb', line 45 def set_configs!(max_message_bytes: nil, retention_ms: nil, retention_bytes: nil) config = {} config['max.message.bytes'] = if config['retention.ms'] = retention_ms if retention_ms config['retention.bytes'] = retention_bytes if retention_bytes @client.alter_topic(@name, config) unless config.empty? refresh! end |
#set_partitions!(num_partitions) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'app/models/kafka_command/topic.rb', line 55 def set_partitions!(num_partitions) unless @client.supports_api?(Kafka::Protocol::CREATE_PARTITIONS_API) raise UnsupportedApiError, 'This version of Kafka does not support the create partitions API.' end @client.create_partitions_for( @name, num_partitions: num_partitions, timeout: API_TIMEOUT ) refresh! end |
#topic_configs ⇒ Object
125 126 127 |
# File 'app/models/kafka_command/topic.rb', line 125 def topic_configs @topic_configs ||= describe end |