Module: Kafka::Admin

Defined in:
lib/ext/kafka.rb

Constant Summary collapse

TopicCommandOptions =
TopicCommand::TopicCommandOptions

Class Method Summary collapse

Class Method Details

.assign_replicas_to_brokers(brokers, partitions, repl_factor, index = -1,, partition = -1)) ⇒ Object



115
116
117
118
# File 'lib/ext/kafka.rb', line 115

def self.assign_replicas_to_brokers(brokers, partitions, repl_factor, index=-1, partition=-1)
  assignment = AdminUtils.assign_replicas_to_brokers(brokers, partitions.to_java(:int), repl_factor.to_java(:int), index.to_java(:int), partition.to_java(:int))
  ScalaEnumerable.new(assignment)
end

.get_broker_metadatas(zk_client, brokers, force_rack = true) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/ext/kafka.rb', line 120

def self.get_broker_metadatas(zk_client, brokers, force_rack = true)
  rack_aware = if force_rack
    JRuby.runtime.jruby_class_loader.load_class('kafka.admin.RackAwareMode$Enforced$').get_declared_field('MODULE$').get(nil)
  else
    JRuby.runtime.jruby_class_loader.load_class('kafka.admin.RackAwareMode$Safe$').get_declared_field('MODULE$').get(nil)
  end
  broker_metadatas = Kafka::Admin::AdminUtils.get_broker_metadatas(
    zk_client.utils, 
    rack_aware,
    Scala::Option[Scala::Collection::JavaConversions.as_scala_iterable(brokers).to_list]
  )
  Scala::Collection::JavaConversions.seq_as_java_list(broker_metadatas).to_a
end

.get_broker_rack(zk_client, broker_id) ⇒ Object



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/ext/kafka.rb', line 134

def self.get_broker_rack(zk_client, broker_id)
   = Kafka::Admin.get_broker_metadatas(zk_client, [broker_id]).first
  if 
    rack = .rack
    unless rack.defined?
      raise "Broker #{.id} is missing rack information, unable to create rack aware shuffle plan."
    end
    rack.get
  end
rescue Java::KafkaAdmin::AdminOperationException => e
  if e.message.include? '--disable-rack-aware'
    raise "Not all brokers have rack information. Unable to create rack aware shuffle plan."
  else
    raise e
  end
end

.preferred_replica(zk_client, topics_partitions) ⇒ Object



111
112
113
# File 'lib/ext/kafka.rb', line 111

def self.preferred_replica(zk_client, topics_partitions)
  PreferredReplicaLeaderElectionCommand.write_preferred_replica_election_data(zk_client, topics_partitions)
end

.to_topic_options(hash) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/ext/kafka.rb', line 97

def self.to_topic_options(hash)
  options = hash.flat_map do |key, value|
    kafka_key = '--' + key.to_s.gsub('_', '-')
    if value.is_a?(Hash)
      value.map { |k, v| [kafka_key, [k, v].join('=')] }
    elsif value.is_a?(Array)
      value.map { |v| [kafka_key, v] }
    else
      [kafka_key, value].compact
    end
  end
  TopicCommandOptions.new(options.flatten)
end