Class: Kazoo::Cluster
- Inherits:
-
Object
- Object
- Kazoo::Cluster
- Defined in:
- lib/kazoo/cluster.rb
Instance Attribute Summary collapse
-
#zookeeper ⇒ Object
readonly
Returns the value of attribute zookeeper.
Instance Method Summary collapse
- #brokers ⇒ Object
- #close ⇒ Object
- #consumergroups ⇒ Object
-
#initialize(zookeeper) ⇒ Cluster
constructor
A new instance of Cluster.
- #partitions ⇒ Object
- #reset_metadata ⇒ Object
- #topics ⇒ Object
- #under_replicated? ⇒ Boolean
- #zk ⇒ Object
Constructor Details
#initialize(zookeeper) ⇒ Cluster
Returns a new instance of Cluster.
6 7 8 9 |
# File 'lib/kazoo/cluster.rb', line 6 def initialize(zookeeper) @zookeeper = zookeeper @zk_mutex, @brokers_mutex, @topics_mutex, @consumergroups_mutex = Mutex.new, Mutex.new, Mutex.new, Mutex.new end |
Instance Attribute Details
#zookeeper ⇒ Object (readonly)
Returns the value of attribute zookeeper.
4 5 6 |
# File 'lib/kazoo/cluster.rb', line 4 def zookeeper @zookeeper end |
Instance Method Details
#brokers ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/kazoo/cluster.rb', line 17 def brokers @brokers_mutex.synchronize do @brokers ||= begin brokers = zk.get_children(path: "/brokers/ids") result, threads, mutex = {}, ThreadGroup.new, Mutex.new brokers.fetch(:children).map do |id| t = Thread.new do broker_info = zk.get(path: "/brokers/ids/#{id}") broker = Kazoo::Broker.from_json(self, id, JSON.parse(broker_info.fetch(:data))) mutex.synchronize { result[id.to_i] = broker } end threads.add(t) end threads.list.each(&:join) result end end end |
#close ⇒ Object
74 75 76 |
# File 'lib/kazoo/cluster.rb', line 74 def close zk.close end |
#consumergroups ⇒ Object
36 37 38 39 40 41 |
# File 'lib/kazoo/cluster.rb', line 36 def consumergroups @consumergroups ||= begin consumers = zk.get_children(path: "/consumers") consumers.fetch(:children).map { |name| Kazoo::Consumergroup.new(self, name) } end end |
#partitions ⇒ Object
62 63 64 |
# File 'lib/kazoo/cluster.rb', line 62 def partitions topics.values.flat_map(&:partitions) end |
#reset_metadata ⇒ Object
66 67 68 |
# File 'lib/kazoo/cluster.rb', line 66 def @topics, @brokers = nil, nil end |
#topics ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/kazoo/cluster.rb', line 43 def topics @topics_mutex.synchronize do @topics ||= begin topics = zk.get_children(path: "/brokers/topics") result, threads, mutex = {}, ThreadGroup.new, Mutex.new topics.fetch(:children).each do |name| t = Thread.new do topic_info = zk.get(path: "/brokers/topics/#{name}") topic = Kazoo::Topic.from_json(self, name, JSON.parse(topic_info.fetch(:data))) mutex.synchronize { result[name] = topic } end threads.add(t) end threads.list.each(&:join) result end end end |
#under_replicated? ⇒ Boolean
70 71 72 |
# File 'lib/kazoo/cluster.rb', line 70 def under_replicated? partitions.any?(&:under_replicated?) end |
#zk ⇒ Object
11 12 13 14 15 |
# File 'lib/kazoo/cluster.rb', line 11 def zk @zk_mutex.synchronize do @zk ||= Zookeeper.new(zookeeper) end end |