Class: Ktl::Cluster
Instance Method Summary collapse
- #decommission_broker(broker_id) ⇒ Object
- #migrate_broker ⇒ Object
- #preferred_replica(regexp = '.*') ⇒ Object
- #reassignment_progress ⇒ Object
- #shuffle(regexp = '.*') ⇒ Object
- #stats ⇒ Object
Instance Method Details
#decommission_broker(broker_id) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/ktl/cluster.rb', line 92 def decommission_broker(broker_id) with_zk_client do |zk_client| if .rendezvous? plan = RendezvousShufflePlan.new(zk_client, blacklist: [broker_id.to_i]) else plan = DecommissionPlan.new(zk_client, broker_id.to_i) end reassigner = create_reassigner(zk_client, ) execute_reassignment(reassigner, plan, ) end end |
#migrate_broker ⇒ Object
40 41 42 43 44 45 46 47 |
# File 'lib/ktl/cluster.rb', line 40 def migrate_broker with_zk_client do |zk_client| old_brokers, new_brokers = .values_at(:from, :to) plan = MigrationPlan.new(zk_client, old_brokers.map(&:to_i), new_brokers.map(&:to_i), log_plan: .verbose, logger: logger) reassigner = create_reassigner(zk_client, ) execute_reassignment(reassigner, plan, ) end end |
#preferred_replica(regexp = '.*') ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/ktl/cluster.rb', line 16 def preferred_replica(regexp='.*') with_zk_client do |zk_client| regexp = Regexp.new(regexp) partitions = zk_client.all_partitions partitions = partitions.filter { |tp| !!tp.topic.match(regexp) }.to_set if partitions.size > 0 logger.info 'performing preferred replica leader election on %d partitions' % partitions.size Kafka::Admin.preferred_replica(zk_client.raw_client, partitions) else logger.info 'no topics matched %s' % regexp.inspect end end end |
#reassignment_progress ⇒ Object
107 108 109 110 111 112 |
# File 'lib/ktl/cluster.rb', line 107 def reassignment_progress with_zk_client do |zk_client| progress = ReassignmentProgress.new(zk_client, .merge(logger: logger)) progress.display(shell) end end |
#shuffle(regexp = '.*') ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/ktl/cluster.rb', line 62 def shuffle(regexp='.*') with_zk_client do |zk_client| plan_factory = if .rack_aware RackAwareShufflePlan elsif .rendezvous RendezvousShufflePlan else ShufflePlan end plan = plan_factory.new(zk_client, { filter: Regexp.new(regexp), brokers: .brokers, blacklist: .blacklist, replication_factor: .replication_factor, logger: logger, log_plan: .dryrun, }) reassigner = create_reassigner(zk_client, ) execute_reassignment(reassigner, plan, ) end end |
#stats ⇒ Object
7 8 9 10 11 12 |
# File 'lib/ktl/cluster.rb', line 7 def stats with_zk_client do |zk_client| task = ClusterStatsTask.new(zk_client, shell) task.execute end end |