Class: Ktl::Cluster

Inherits:
Command show all
Defined in:
lib/ktl/cluster.rb

Instance Method Summary collapse

Instance Method Details

#decommission_broker(broker_id) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
# File 'lib/ktl/cluster.rb', line 92

def decommission_broker(broker_id)
  with_zk_client do |zk_client|
    if options.rendezvous?
      plan = RendezvousShufflePlan.new(zk_client, blacklist: [broker_id.to_i])
    else
      plan = DecommissionPlan.new(zk_client, broker_id.to_i)
    end
    reassigner = create_reassigner(zk_client, options)
    execute_reassignment(reassigner, plan, options)
  end
end

#migrate_brokerObject



40
41
42
43
44
45
46
47
# File 'lib/ktl/cluster.rb', line 40

def migrate_broker
  with_zk_client do |zk_client|
    old_brokers, new_brokers = options.values_at(:from, :to)
    plan = MigrationPlan.new(zk_client, old_brokers.map(&:to_i), new_brokers.map(&:to_i), log_plan: options.verbose, logger: logger)
    reassigner = create_reassigner(zk_client, options)
    execute_reassignment(reassigner, plan, options)
  end
end

#preferred_replica(regexp = '.*') ⇒ Object



16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/ktl/cluster.rb', line 16

def preferred_replica(regexp='.*')
  with_zk_client do |zk_client|
    regexp = Regexp.new(regexp)
    partitions = zk_client.all_partitions
    partitions = partitions.filter { |tp| !!tp.topic.match(regexp) }.to_set
    if partitions.size > 0
      logger.info 'performing preferred replica leader election on %d partitions' % partitions.size
      Kafka::Admin.preferred_replica(zk_client.raw_client, partitions)
    else
      logger.info 'no topics matched %s' % regexp.inspect
    end
  end
end

#reassignment_progressObject



107
108
109
110
111
112
# File 'lib/ktl/cluster.rb', line 107

def reassignment_progress
  with_zk_client do |zk_client|
    progress = ReassignmentProgress.new(zk_client, options.merge(logger: logger))
    progress.display(shell)
  end
end

#shuffle(regexp = '.*') ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/ktl/cluster.rb', line 62

def shuffle(regexp='.*')
  with_zk_client do |zk_client|
    plan_factory = if options.rack_aware
      RackAwareShufflePlan
    elsif options.rendezvous
      RendezvousShufflePlan
    else
      ShufflePlan
    end
    plan = plan_factory.new(zk_client, {
      filter: Regexp.new(regexp),
      brokers: options.brokers,
      blacklist: options.blacklist,
      replication_factor: options.replication_factor,
      logger: logger,
      log_plan: options.dryrun,
    })
    reassigner = create_reassigner(zk_client, options)
    execute_reassignment(reassigner, plan, options)
  end
end

#statsObject



7
8
9
10
11
12
# File 'lib/ktl/cluster.rb', line 7

def stats
  with_zk_client do |zk_client|
    task = ClusterStatsTask.new(zk_client, shell)
    task.execute
  end
end