Class: Kafkat::Interface::Admin
- Inherits:
-
Object
- Object
- Kafkat::Interface::Admin
- Defined in:
- lib/kafkat/interface/admin.rb
Defined Under Namespace
Classes: ExecutionFailedError
Instance Attribute Summary collapse
-
#kafka_path ⇒ Object
readonly
Returns the value of attribute kafka_path.
-
#zk_path ⇒ Object
readonly
Returns the value of attribute zk_path.
Instance Method Summary collapse
- #elect_leaders!(partitions) ⇒ Object
-
#initialize(config) ⇒ Admin
constructor
A new instance of Admin.
- #reassign!(assignments) ⇒ Object
- #run_tool(name, *args) ⇒ Object
- #shutdown!(broker_id, options = {}) ⇒ Object
Constructor Details
#initialize(config) ⇒ Admin
Returns a new instance of Admin.
11 12 13 14 |
# File 'lib/kafkat/interface/admin.rb', line 11 def initialize(config) @kafka_path = config.kafka_path @zk_path = config.zk_path end |
Instance Attribute Details
#kafka_path ⇒ Object (readonly)
Returns the value of attribute kafka_path.
8 9 10 |
# File 'lib/kafkat/interface/admin.rb', line 8 def kafka_path @kafka_path end |
#zk_path ⇒ Object (readonly)
Returns the value of attribute zk_path.
9 10 11 |
# File 'lib/kafkat/interface/admin.rb', line 9 def zk_path @zk_path end |
Instance Method Details
#elect_leaders!(partitions) ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/kafkat/interface/admin.rb', line 16 def elect_leaders!(partitions) file = Tempfile.new('kafkat-partitions.json') json_partitions = [] partitions.each do |p| json_partitions << { 'topic' => p.topic_name, 'partition' => p.id } end json = {'partitions' => json_partitions} file.write(JSON.dump(json)) file.close run_tool( 'kafka-preferred-replica-election', '--path-to-json-file', file.path ) ensure file.unlink end |
#reassign!(assignments) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/kafkat/interface/admin.rb', line 39 def reassign!(assignments) file = Tempfile.new('kafkat-partitions.json') json_partitions = [] assignments.each do |a| json_partitions << { 'topic' => a.topic_name, 'partition' => a.partition_id, 'replicas' => a.replicas } end json = { 'partitions' => json_partitions, 'version' => 1 } file.write(JSON.dump(json)) file.close run_tool( 'kafka-reassign-partitions', '--execute', '--reassignment-json-file', file.path ) ensure file.unlink end |
#run_tool(name, *args) ⇒ Object
80 81 82 83 84 85 86 87 |
# File 'lib/kafkat/interface/admin.rb', line 80 def run_tool(name, *args) path = File.join(kafka_path, "bin/#{name}.sh") args += ['--zookeeper', "\"#{zk_path}\""] args_string = args.join(' ') result = `#{path} #{args_string}` raise ExecutionFailedError if $?.to_i > 0 result end |
#shutdown!(broker_id, options = {}) ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/kafkat/interface/admin.rb', line 68 def shutdown!(broker_id, ={}) args = ['--broker', broker_id] args += ['--num.retries', [:retries]] if [:retries] args += ['--retry.interval.ms', option[:interval]] if [:interval] run_tool( 'kafka-run-class', 'kafka.admin.ShutdownBroker', *args ) end |