Class: Kafkat::Interface::Zookeeper

Inherits:
Object
  • Object
show all
Defined in:
lib/kafkat/interface/zookeeper.rb

Defined Under Namespace

Classes: NotFoundError, WriteConflictError

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Zookeeper

Returns a new instance of Zookeeper.



11
12
13
# File 'lib/kafkat/interface/zookeeper.rb', line 11

def initialize(config)
  @zk_path = config.zk_path
end

Instance Attribute Details

#zk_pathObject (readonly)

Returns the value of attribute zk_path.



9
10
11
# File 'lib/kafkat/interface/zookeeper.rb', line 9

def zk_path
  @zk_path
end

Instance Method Details

#get_broker(id) ⇒ Object



33
34
35
36
37
38
39
40
41
# File 'lib/kafkat/interface/zookeeper.rb', line 33

def get_broker(id)
  path = broker_path(id)
  string = zk.get(path).first
  json = JSON.parse(string)
  host, port = json['host'], json['port']
  Broker.new(id, host, port)
rescue ZK::Exceptions::NoNode
  raise NotFoundError
end

#get_brokers(ids = nil) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/kafkat/interface/zookeeper.rb', line 15

def get_brokers(ids=nil)
  brokers = {}
  ids ||= zk.children(brokers_path)

  threads = ids.map do |id|
    id = id.to_i
    Thread.new do
      begin
        brokers[id] = get_broker(id)
      rescue
      end
    end
  end
  threads.map(&:join)

  brokers
end

#get_controllerObject



90
91
92
93
94
95
96
97
# File 'lib/kafkat/interface/zookeeper.rb', line 90

def get_controller
  string = zk.get(controller_path).first
  controller_json = JSON.parse(string)
  controller_id = controller_json['brokerid']
  get_broker(controller_id)
rescue ZK::Exceptions::NoNode
  raise NotFoundError
end

#get_topic(name) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/kafkat/interface/zookeeper.rb', line 60

def get_topic(name)
  path1 = topic_path(name)
  topic_string = zk.get(path1).first
  topic_json = JSON.parse(topic_string)

  partitions = []
  path2 = topic_partitions_path(name)

  threads = zk.children(path2).map do |id|
    id = id.to_i
    Thread.new do
      path3 = topic_partition_state_path(name, id)
      partition_string = zk.get(path3).first
      partition_json = JSON.parse(partition_string)

      replicas = topic_json['partitions'][id.to_s]
      leader = partition_json['leader']
      isr = partition_json['isr']

      partitions << Partition.new(name, id, replicas, leader, isr)
    end
  end
  threads.map(&:join)

  partitions.sort_by!(&:id)
  Topic.new(name, partitions)
rescue ZK::Exceptions::NoNode
  raise NotFoundError
end

#get_topics(names = nil) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/kafkat/interface/zookeeper.rb', line 43

def get_topics(names=nil)
  topics = {}
  names ||= zk.children(topics_path)

  threads = names.map do |name|
    Thread.new do
      begin
        topics[name] = get_topic(name)
      rescue => e
      end
    end
  end
  threads.map(&:join)

  topics
end

#write_leader(partition, broker_id) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/kafkat/interface/zookeeper.rb', line 99

def write_leader(partition, broker_id)
  path = topic_partition_state_path(partition.topic_name, partition.id)
  string, stat = zk.get(path)

  partition_json = JSON.parse(string)
  partition_json['leader'] = broker_id
  new_string = JSON.dump(partition_json)

  unless zk.set(path, new_string, version: stat.version)
    raise ChangedDuringUpdateError
  end
end