Class: Kafkat::Interface::Zookeeper
- Inherits:
-
Object
- Object
- Kafkat::Interface::Zookeeper
show all
- Defined in:
- lib/kafkat/interface/zookeeper.rb
Defined Under Namespace
Classes: NotFoundError, WriteConflictError
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(config) ⇒ Zookeeper
Returns a new instance of Zookeeper.
11
12
13
|
# File 'lib/kafkat/interface/zookeeper.rb', line 11
def initialize(config)
@zk_path = config.zk_path
end
|
Instance Attribute Details
#zk_path ⇒ Object
Returns the value of attribute zk_path.
9
10
11
|
# File 'lib/kafkat/interface/zookeeper.rb', line 9
def zk_path
@zk_path
end
|
Instance Method Details
#get_broker(id) ⇒ Object
33
34
35
36
37
38
39
40
41
|
# File 'lib/kafkat/interface/zookeeper.rb', line 33
def get_broker(id)
path = broker_path(id)
string = zk.get(path).first
json = JSON.parse(string)
host, port = json['host'], json['port']
Broker.new(id, host, port)
rescue ZK::Exceptions::NoNode
raise NotFoundError
end
|
#get_brokers(ids = nil) ⇒ Object
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
# File 'lib/kafkat/interface/zookeeper.rb', line 15
def get_brokers(ids=nil)
brokers = {}
ids ||= zk.children(brokers_path)
threads = ids.map do |id|
id = id.to_i
Thread.new do
begin
brokers[id] = get_broker(id)
rescue
end
end
end
threads.map(&:join)
brokers
end
|
#get_controller ⇒ Object
90
91
92
93
94
95
96
97
|
# File 'lib/kafkat/interface/zookeeper.rb', line 90
def get_controller
string = zk.get(controller_path).first
controller_json = JSON.parse(string)
controller_id = controller_json['brokerid']
get_broker(controller_id)
rescue ZK::Exceptions::NoNode
raise NotFoundError
end
|
#get_topic(name) ⇒ Object
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
# File 'lib/kafkat/interface/zookeeper.rb', line 60
def get_topic(name)
path1 = topic_path(name)
topic_string = zk.get(path1).first
topic_json = JSON.parse(topic_string)
partitions = []
path2 = topic_partitions_path(name)
threads = zk.children(path2).map do |id|
id = id.to_i
Thread.new do
path3 = topic_partition_state_path(name, id)
partition_string = zk.get(path3).first
partition_json = JSON.parse(partition_string)
replicas = topic_json['partitions'][id.to_s]
leader = partition_json['leader']
isr = partition_json['isr']
partitions << Partition.new(name, id, replicas, leader, isr)
end
end
threads.map(&:join)
partitions.sort_by!(&:id)
Topic.new(name, partitions)
rescue ZK::Exceptions::NoNode
raise NotFoundError
end
|
#get_topics(names = nil) ⇒ Object
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
# File 'lib/kafkat/interface/zookeeper.rb', line 43
def get_topics(names=nil)
topics = {}
names ||= zk.children(topics_path)
threads = names.map do |name|
Thread.new do
begin
topics[name] = get_topic(name)
rescue => e
end
end
end
threads.map(&:join)
topics
end
|
#write_leader(partition, broker_id) ⇒ Object
99
100
101
102
103
104
105
106
107
108
109
110
|
# File 'lib/kafkat/interface/zookeeper.rb', line 99
def write_leader(partition, broker_id)
path = topic_partition_state_path(partition.topic_name, partition.id)
string, stat = zk.get(path)
partition_json = JSON.parse(string)
partition_json['leader'] = broker_id
new_string = JSON.dump(partition_json)
unless zk.set(path, new_string, version: stat.version)
raise ChangedDuringUpdateError
end
end
|