Class: Ktl::ShufflePlan
- Inherits:
-
Object
- Object
- Ktl::ShufflePlan
- Defined in:
- lib/ktl/shuffle_plan.rb
Direct Known Subclasses
Instance Method Summary collapse
- #generate ⇒ Object
- #generate_for_new_topic(topic, partition_count) ⇒ Object
-
#initialize(zk_client, options = {}) ⇒ ShufflePlan
constructor
A new instance of ShufflePlan.
Constructor Details
#initialize(zk_client, options = {}) ⇒ ShufflePlan
Returns a new instance of ShufflePlan.
5 6 7 8 9 10 |
# File 'lib/ktl/shuffle_plan.rb', line 5 def initialize(zk_client, = {}) @zk_client = zk_client @options = @logger = [:logger] || NullLogger.new @log_plan = !![:log_plan] end |
Instance Method Details
#generate ⇒ Object
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/ktl/shuffle_plan.rb', line 12 def generate topics = @zk_client.all_topics if (filter = @options[:filter]) topics = topics.filter { |t| !!t.match(filter) } end topics_partitions = ScalaEnumerable.new(@zk_client.partitions_for_topics(topics)) topics_partitions = topics_partitions.sort_by(&:first) replica_assignments = @zk_client.replica_assignment_for_topics(topics) brokers = select_brokers reassignment_plan = Scala::Collection::Map.empty topics_partitions.each do |tp| topic, partitions = tp.elements nr_replicas = @options[:replication_factor] || replica_assignments.apply(Kafka::TopicAndPartition.new(topic, 0)).size assignment = assign_replicas_to_brokers(topic, brokers, partitions.size, nr_replicas) assignment.each do |pr| partition, replicas = pr.elements topic_partition = Kafka::TopicAndPartition.new(topic, partition) current_assignment = replica_assignments.apply(topic_partition) unless current_assignment == replicas @logger.info "Moving #{topic_partition.topic},#{topic_partition.partition} from #{current_assignment} to #{replicas}" if @log_plan reassignment_plan += Scala::Tuple.new(topic_partition, replicas) end end end reassignment_plan end |
#generate_for_new_topic(topic, partition_count) ⇒ Object
39 40 41 42 43 44 45 46 47 |
# File 'lib/ktl/shuffle_plan.rb', line 39 def generate_for_new_topic(topic, partition_count) brokers = select_brokers nr_replicas = @options[:replication_factor] || 1 assignment = assign_replicas_to_brokers(topic, brokers, partition_count, nr_replicas) assignment.map do |pr| partition, replicas = pr.elements Scala::Collection::JavaConversions.as_java_iterable(replicas).to_a end end |