Class: Kafkat::Command::Reassign
- Defined in:
- lib/kafkat/command/reassign.rb
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
Methods inherited from Base
#admin, #initialize, #kafka_logs, register_as, usage, usages, #zookeeper
Methods included from Kafkat::CommandIO
#prompt_and_execute_assignments
Methods included from Formatting
#justify, #print_assignment, #print_assignment_header, #print_broker, #print_broker_header, #print_partition, #print_partition_header, #print_topic, #print_topic_header
Constructor Details
This class inherits a constructor from Kafkat::Command::Base
Instance Method Details
#run ⇒ Object
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/kafkat/command/reassign.rb', line 9 def run topic_names = ARGV.shift unless ARGV[0] && ARGV[0].start_with?('--') all_brokers = zookeeper.get_brokers topics = nil if topic_names topics_list = topic_names.split(',') topics = zookeeper.get_topics(topics_list) end topics ||= zookeeper.get_topics opts = Trollop. do opt :brokers, "replica set (broker IDs)", type: :string opt :replicas, "number of replicas (count)", type: :integer end broker_ids = opts[:brokers] && opts[:brokers].split(',').map(&:to_i) replica_count = opts[:replicas] broker_ids ||= zookeeper.get_brokers.values.map(&:id) all_brokers_id = all_brokers.values.map(&:id) broker_ids.each do |id| if !all_brokers_id.include?(id) print "ERROR: Broker #{id} is not currently active.\n" exit 1 end end # *** This logic is duplicated from Kakfa 0.8.1.1 *** assignments = [] broker_count = broker_ids.size topics.each do |_, t| # This is how Kafka's AdminUtils determines these values. partition_count = t.partitions.size topic_replica_count = replica_count || t.partitions[0].replicas.size if topic_replica_count > broker_count print "ERROR: Replication factor (#{topic_replica_count}) is larger than brokers (#{broker_count}).\n" exit 1 end start_index = Random.rand(broker_count) replica_shift = Random.rand(broker_count) t.partitions.each do |p| replica_shift += 1 if p.id > 0 && p.id % broker_count == 0 first_replica_index = (p.id + start_index) % broker_count replicas = [broker_ids[first_replica_index]] (0...topic_replica_count-1).each do |i| shift = 1 + (replica_shift + i) % (broker_count - 1) index = (first_replica_index + shift) % broker_count replicas << broker_ids[index] end replicas.reverse! assignments << Assignment.new(t.name, p.id, replicas) end end # **************** prompt_and_execute_assignments(assignments) end |