Class: Kafkat::Command::Drain
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
-
#build_partitions_by_broker(topic, destination_brokers) ⇒ Object
Build a hash map from broker id to number of partitions on it to facilitate finding the broker with lowest number of partitions to help balance brokers.
- #generate_assignments(source_broker, topics, destination_brokers) ⇒ Object
-
#run ⇒ Object
For each partition (of specified topic) on the source broker, the command is to assign the partition to one of the destination brokers that does not already have this partition, along with existing brokers to achieve minimal movement of data.
Methods inherited from Base
#admin, #initialize, #kafka_logs, register_as, usage, usages, #zookeeper
Methods included from Kafkat::CommandIO
#prompt_and_execute_assignments
Methods included from Formatting
#justify, #print_assignment, #print_assignment_header, #print_broker, #print_broker_header, #print_partition, #print_partition_header, #print_topic, #print_topic_header
Constructor Details
This class inherits a constructor from Kafkat::Command::Base
Instance Method Details
#build_partitions_by_broker(topic, destination_brokers) ⇒ Object
Build a hash map from broker id to number of partitions on it to facilitate finding the broker with lowest number of partitions to help balance brokers.
87 88 89 90 91 92 93 94 95 96 |
# File 'lib/kafkat/command/drain.rb', line 87 def build_partitions_by_broker(topic, destination_brokers) partitions_by_broker = Hash.new(0) destination_brokers.each { |id| partitions_by_broker[id] = 0 } topic.partitions.each do |p| p.replicas.each do |r| partitions_by_broker[r] += 1 end end partitions_by_broker end |
#generate_assignments(source_broker, topics, destination_brokers) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/kafkat/command/drain.rb', line 51 def generate_assignments(source_broker, topics, destination_brokers) assignments = [] topics.each do |_, t| partitions_by_broker = build_partitions_by_broker(t, destination_brokers) t.partitions.each do |p| if p.replicas.include? source_broker replicas = p.replicas - [source_broker] source_broker_is_leader = p.replicas.first == source_broker potential_broker_ids = destination_brokers - replicas if potential_broker_ids.empty? print "ERROR: Not enough destination brokers to reassign topic \"#{t.name}\".\n" exit 1 end num_partitions_on_potential_broker = partitions_by_broker.select { |id, _| potential_broker_ids.include? id } assigned_broker_id = num_partitions_on_potential_broker.min_by{ |id, num| num }[0] if source_broker_is_leader replicas.unshift(assigned_broker_id) else replicas << assigned_broker_id end partitions_by_broker[assigned_broker_id] += 1 assignments << Assignment.new(t.name, p.id, replicas) end end end assignments end |
#run ⇒ Object
For each partition (of specified topic) on the source broker, the command is to assign the partition to one of the destination brokers that does not already have this partition, along with existing brokers to achieve minimal movement of data. To help distribute data evenly, if there are more than one destination brokers meet the requirement, the command will always choose the brokers with the lowest number of partitions of the involving topic.
In order to find out the broker with lowest number of partitions, the command maintain a hash table with broker id as key and number of partitions as value. The hash table will be updated along with assignment.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/kafkat/command/drain.rb', line 19 def run source_broker = ARGV[0] && ARGV.shift.to_i if source_broker.nil? puts "You must specify a broker ID." exit 1 end opts = Trollop. do opt :brokers, "destination broker IDs", type: :string opt :topic, "topic name to reassign", type: :string end topic_name = opts[:topic] topics = topic_name && zookeeper.get_topics([topic_name]) topics ||= zookeeper.get_topics destination_brokers = opts[:brokers] && opts[:brokers].split(',').map(&:to_i) destination_brokers ||= zookeeper.get_brokers.values.map(&:id) destination_brokers.delete(source_broker) active_brokers = zookeeper.get_brokers.values.map(&:id) unless (inactive_brokers = destination_brokers - active_brokers).empty? print "ERROR: Broker #{inactive_brokers} are not currently active.\n" exit 1 end assignments = generate_assignments(source_broker, topics, destination_brokers) prompt_and_execute_assignments(assignments) end |