Module: Kafka::Admin
- Defined in:
- lib/ext/kafka.rb
Constant Summary collapse
- TopicCommandOptions =
TopicCommand::TopicCommandOptions
Class Method Summary collapse
- .assign_replicas_to_brokers(brokers, partitions, repl_factor, index = -1,, partition = -1)) ⇒ Object
- .get_broker_metadatas(zk_client, brokers, force_rack = true) ⇒ Object
- .get_broker_rack(zk_client, broker_id) ⇒ Object
- .preferred_replica(zk_client, topics_partitions) ⇒ Object
- .to_topic_options(hash) ⇒ Object
Class Method Details
.assign_replicas_to_brokers(brokers, partitions, repl_factor, index = -1,, partition = -1)) ⇒ Object
115 116 117 118 |
# File 'lib/ext/kafka.rb', line 115 def self.assign_replicas_to_brokers(brokers, partitions, repl_factor, index=-1, partition=-1) assignment = AdminUtils.assign_replicas_to_brokers(brokers, partitions.to_java(:int), repl_factor.to_java(:int), index.to_java(:int), partition.to_java(:int)) ScalaEnumerable.new(assignment) end |
.get_broker_metadatas(zk_client, brokers, force_rack = true) ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/ext/kafka.rb', line 120 def self.(zk_client, brokers, force_rack = true) rack_aware = if force_rack JRuby.runtime.jruby_class_loader.load_class('kafka.admin.RackAwareMode$Enforced$').get_declared_field('MODULE$').get(nil) else JRuby.runtime.jruby_class_loader.load_class('kafka.admin.RackAwareMode$Safe$').get_declared_field('MODULE$').get(nil) end = Kafka::Admin::AdminUtils.( zk_client.utils, rack_aware, Scala::Option[Scala::Collection::JavaConversions.as_scala_iterable(brokers).to_list] ) Scala::Collection::JavaConversions.seq_as_java_list().to_a end |
.get_broker_rack(zk_client, broker_id) ⇒ Object
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/ext/kafka.rb', line 134 def self.get_broker_rack(zk_client, broker_id) = Kafka::Admin.(zk_client, [broker_id]).first if rack = .rack unless rack.defined? raise "Broker #{.id} is missing rack information, unable to create rack aware shuffle plan." end rack.get end rescue Java::KafkaAdmin::AdminOperationException => e if e..include? '--disable-rack-aware' raise "Not all brokers have rack information. Unable to create rack aware shuffle plan." else raise e end end |
.preferred_replica(zk_client, topics_partitions) ⇒ Object
111 112 113 |
# File 'lib/ext/kafka.rb', line 111 def self.preferred_replica(zk_client, topics_partitions) PreferredReplicaLeaderElectionCommand.write_preferred_replica_election_data(zk_client, topics_partitions) end |
.to_topic_options(hash) ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/ext/kafka.rb', line 97 def self.(hash) = hash.flat_map do |key, value| kafka_key = '--' + key.to_s.gsub('_', '-') if value.is_a?(Hash) value.map { |k, v| [kafka_key, [k, v].join('=')] } elsif value.is_a?(Array) value.map { |v| [kafka_key, v] } else [kafka_key, value].compact end end TopicCommandOptions.new(.flatten) end |