Class: Ktl::ShufflePlan

Inherits:
Object
  • Object
show all
Defined in:
lib/ktl/shuffle_plan.rb

Direct Known Subclasses

RendezvousShufflePlan

Instance Method Summary collapse

Constructor Details

#initialize(zk_client, options = {}) ⇒ ShufflePlan

Returns a new instance of ShufflePlan.



5
6
7
8
9
10
# File 'lib/ktl/shuffle_plan.rb', line 5

def initialize(zk_client, options = {})
  @zk_client = zk_client
  @options = options
  @logger = options[:logger] || NullLogger.new
  @log_plan = !!options[:log_plan]
end

Instance Method Details

#generateObject



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/ktl/shuffle_plan.rb', line 12

def generate
  topics = @zk_client.all_topics
  if (filter = @options[:filter])
    topics = topics.filter { |t| !!t.match(filter) }
  end
  topics_partitions = ScalaEnumerable.new(@zk_client.partitions_for_topics(topics))
  topics_partitions = topics_partitions.sort_by(&:first)
  replica_assignments = @zk_client.replica_assignment_for_topics(topics)
  brokers = select_brokers
  reassignment_plan = Scala::Collection::Map.empty
  topics_partitions.each do |tp|
    topic, partitions = tp.elements
    nr_replicas = @options[:replication_factor] || replica_assignments.apply(Kafka::TopicAndPartition.new(topic, 0)).size
    assignment = assign_replicas_to_brokers(topic, brokers, partitions.size, nr_replicas)
    assignment.each do |pr|
      partition, replicas = pr.elements
      topic_partition = Kafka::TopicAndPartition.new(topic, partition)
      current_assignment = replica_assignments.apply(topic_partition)
      unless current_assignment == replicas
        @logger.info "Moving #{topic_partition.topic},#{topic_partition.partition} from #{current_assignment} to #{replicas}" if @log_plan
        reassignment_plan += Scala::Tuple.new(topic_partition, replicas)
      end
    end
  end
  reassignment_plan
end

#generate_for_new_topic(topic, partition_count) ⇒ Object



39
40
41
42
43
44
45
46
47
# File 'lib/ktl/shuffle_plan.rb', line 39

def generate_for_new_topic(topic, partition_count)
  brokers = select_brokers
  nr_replicas = @options[:replication_factor] || 1
  assignment = assign_replicas_to_brokers(topic, brokers, partition_count, nr_replicas)
  assignment.map do |pr|
    partition, replicas = pr.elements
    Scala::Collection::JavaConversions.as_java_iterable(replicas).to_a
  end
end