Class: Ktl::MigrationPlan
- Inherits:
-
Object
- Object
- Ktl::MigrationPlan
- Defined in:
- lib/ktl/migration_plan.rb
Instance Method Summary collapse
- #generate ⇒ Object
-
#initialize(zk_client, from_brokers, to_brokers, options = {}) ⇒ MigrationPlan
constructor
A new instance of MigrationPlan.
Constructor Details
#initialize(zk_client, from_brokers, to_brokers, options = {}) ⇒ MigrationPlan
Returns a new instance of MigrationPlan.
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/ktl/migration_plan.rb', line 5 def initialize(zk_client, from_brokers, to_brokers, = {}) @zk_client = zk_client @from_brokers = from_brokers @to_brokers = to_brokers if @from_brokers.length != @to_brokers.length raise ArgumentError, "Both brokers lists must be of equal length. From: #{@from_brokers}, To: #{@to_brokers}" elsif !(@from_brokers & @to_brokers).empty? raise ArgumentError, "Broker lists must be mutually exclusive. From: #{@from_brokers}, To: #{@to_brokers}" end from_racks = from_brokers.map {|broker_id| Kafka::Admin.get_broker_rack(zk_client, broker_id)} to_racks = to_brokers.map {|broker_id| Kafka::Admin.get_broker_rack(zk_client, broker_id)} if from_racks != to_racks && from_racks.compact.any? raise ArgumentError, "Both broker lists must have the same rack setup. From: #{from_racks}, To: #{to_racks}" end @logger = [:logger] || NullLogger.new @log_plan = !![:log_plan] end |
Instance Method Details
#generate ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/ktl/migration_plan.rb', line 23 def generate plan = Scala::Collection::Map.empty topics = @zk_client.all_topics assignments = ScalaEnumerable.new(@zk_client.replica_assignment_for_topics(topics)) assignments.each do |item| topic_partition = item.first replicas = item.last new_replicas = replicas @from_brokers.each_with_index do |from_broker, index| to_broker = @to_brokers[index] if new_replicas.contains?(from_broker) replacement_index = new_replicas.index_of(from_broker) new_replicas = new_replicas.updated(replacement_index, to_broker, CanBuildFrom) end end if replicas != new_replicas @logger.debug "Moving #{topic_partition.topic},#{topic_partition.partition} from #{replicas} to #{new_replicas}" if @log_plan plan += Scala::Tuple.new(topic_partition, new_replicas) end end plan end |