Class: Kafkat::Interface::Admin

Inherits:
Object
  • Object
show all
Defined in:
lib/kafkat/interface/admin.rb

Defined Under Namespace

Classes: ExecutionFailedError

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Admin

Returns a new instance of Admin.



11
12
13
14
# File 'lib/kafkat/interface/admin.rb', line 11

def initialize(config)
  @kafka_path = config.kafka_path
  @zk_path = config.zk_path
end

Instance Attribute Details

#kafka_pathObject (readonly)

Returns the value of attribute kafka_path.



8
9
10
# File 'lib/kafkat/interface/admin.rb', line 8

def kafka_path
  @kafka_path
end

#zk_pathObject (readonly)

Returns the value of attribute zk_path.



9
10
11
# File 'lib/kafkat/interface/admin.rb', line 9

def zk_path
  @zk_path
end

Instance Method Details

#elect_leaders!(partitions) ⇒ Object



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/kafkat/interface/admin.rb', line 16

def elect_leaders!(partitions)
  file = Tempfile.new('kafkat-partitions.json')

  json_partitions = []
  partitions.each do |p|
    json_partitions << {
      'topic' => p.topic_name,
      'partition' => p.id
    }
  end

  json = {'partitions' => json_partitions}
  file.write(JSON.dump(json))
  file.close

  run_tool(
    'kafka-preferred-replica-election',
      '--path-to-json-file', file.path
  )
ensure
  file.unlink
end

#reassign!(assignments) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/kafkat/interface/admin.rb', line 39

def reassign!(assignments)
  file = Tempfile.new('kafkat-partitions.json')

  json_partitions = []
  assignments.each do |a|
    json_partitions << {
      'topic' => a.topic_name,
      'partition' => a.partition_id,
      'replicas' => a.replicas
    }
  end

  json = {
    'partitions' => json_partitions,
    'version' => 1
  }

  file.write(JSON.dump(json))
  file.close

  run_tool(
    'kafka-reassign-partitions',
      '--execute',
      '--reassignment-json-file', file.path
  )
ensure
  file.unlink
end

#run_tool(name, *args) ⇒ Object



80
81
82
83
84
85
86
87
# File 'lib/kafkat/interface/admin.rb', line 80

def run_tool(name, *args)
  path = File.join(kafka_path, "bin/#{name}.sh")
  args += ['--zookeeper', "\"#{zk_path}\""]
  args_string = args.join(' ')
  result = `#{path} #{args_string}`
  raise ExecutionFailedError if $?.to_i > 0
  result
end

#shutdown!(broker_id, options = {}) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
# File 'lib/kafkat/interface/admin.rb', line 68

def shutdown!(broker_id, options={})
  args = ['--broker', broker_id]
  args += ['--num.retries', options[:retries]] if options[:retries]
  args += ['--retry.interval.ms', option[:interval]] if options[:interval]

  run_tool(
    'kafka-run-class',
      'kafka.admin.ShutdownBroker',
      *args
  )
end