Class: Kazoo::Consumergroup
- Inherits:
-
Object
- Object
- Kazoo::Consumergroup
- Defined in:
- lib/kazoo/consumergroup.rb
Defined Under Namespace
Classes: Instance
Instance Attribute Summary collapse
-
#cluster ⇒ Object
readonly
Returns the value of attribute cluster.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
- #commit_offset(partition, offset) ⇒ Object
- #create ⇒ Object
- #eql?(other) ⇒ Boolean (also: #==)
- #exists? ⇒ Boolean
- #hash ⇒ Object
-
#initialize(cluster, name) ⇒ Consumergroup
constructor
A new instance of Consumergroup.
- #inspect ⇒ Object
- #instances ⇒ Object
- #instantiate(id: nil) ⇒ Object
- #reset_offsets ⇒ Object
- #retrieve_offset(partition) ⇒ Object
- #watch_instances(&block) ⇒ Object
- #watch_partition_claim(partition, &block) ⇒ Object
Constructor Details
#initialize(cluster, name) ⇒ Consumergroup
5 6 7 |
# File 'lib/kazoo/consumergroup.rb', line 5 def initialize(cluster, name) @cluster, @name = cluster, name end |
Instance Attribute Details
#cluster ⇒ Object (readonly)
Returns the value of attribute cluster.
3 4 5 |
# File 'lib/kazoo/consumergroup.rb', line 3 def cluster @cluster end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
3 4 5 |
# File 'lib/kazoo/consumergroup.rb', line 3 def name @name end |
Instance Method Details
#commit_offset(partition, offset) ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/kazoo/consumergroup.rb', line 71 def commit_offset(partition, offset) result = cluster.zk.set(path: "/consumers/#{name}/offsets/#{partition.topic.name}/#{partition.id}", data: (offset + 1).to_s) if result.fetch(:rc) == Zookeeper::Constants::ZNONODE result = cluster.zk.create(path: "/consumers/#{name}/offsets/#{partition.topic.name}") case result.fetch(:rc) when Zookeeper::Constants::ZOK, Zookeeper::Constants::ZNODEEXISTS else raise Kazoo::Error, "Failed to commit offset #{offset} for partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}" end result = cluster.zk.create(path: "/consumers/#{name}/offsets/#{partition.topic.name}/#{partition.id}", data: (offset + 1).to_s) end if result.fetch(:rc) != Zookeeper::Constants::ZOK raise Kazoo::Error, "Failed to commit offset #{offset} for partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}" end end |
#create ⇒ Object
9 10 11 12 13 14 |
# File 'lib/kazoo/consumergroup.rb', line 9 def create cluster.zk.create(path: "/consumers/#{name}") cluster.zk.create(path: "/consumers/#{name}/ids") cluster.zk.create(path: "/consumers/#{name}/owners") cluster.zk.create(path: "/consumers/#{name}/offsets") end |
#eql?(other) ⇒ Boolean Also known as: ==
111 112 113 |
# File 'lib/kazoo/consumergroup.rb', line 111 def eql?(other) other.kind_of?(Kazoo::Consumergroup) && cluster == other.cluster && name == other.name end |
#exists? ⇒ Boolean
16 17 18 19 |
# File 'lib/kazoo/consumergroup.rb', line 16 def exists? stat = cluster.zk.stat(path: "/consumers/#{name}") stat.fetch(:stat).exists? end |
#hash ⇒ Object
117 118 119 |
# File 'lib/kazoo/consumergroup.rb', line 117 def hash [cluster, name].hash end |
#inspect ⇒ Object
107 108 109 |
# File 'lib/kazoo/consumergroup.rb', line 107 def inspect "#<Kazoo::Consumergroup name=#{name}>" end |
#instances ⇒ Object
26 27 28 29 |
# File 'lib/kazoo/consumergroup.rb', line 26 def instances instances = cluster.zk.get_children(path: "/consumers/#{name}/ids") instances.fetch(:children).map { |id| Instance.new(self, id: id) } end |
#instantiate(id: nil) ⇒ Object
22 23 24 |
# File 'lib/kazoo/consumergroup.rb', line 22 def instantiate(id: nil) Instance.new(self, id: id) end |
#reset_offsets ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/kazoo/consumergroup.rb', line 89 def reset_offsets result = cluster.zk.get_children(path: "/consumers/#{name}/offsets") raise Kazoo::Error unless result.fetch(:rc) == Zookeeper::Constants::ZOK result.fetch(:children).each do |topic| result = cluster.zk.get_children(path: "/consumers/#{name}/offsets/#{topic}") raise Kazoo::Error unless result.fetch(:rc) == Zookeeper::Constants::ZOK result.fetch(:children).each do |partition| cluster.zk.delete(path: "/consumers/#{name}/offsets/#{topic}/#{partition}") raise Kazoo::Error unless result.fetch(:rc) == Zookeeper::Constants::ZOK end cluster.zk.delete(path: "/consumers/#{name}/offsets/#{topic}") raise Kazoo::Error unless result.fetch(:rc) == Zookeeper::Constants::ZOK end end |
#retrieve_offset(partition) ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/kazoo/consumergroup.rb', line 59 def retrieve_offset(partition) result = cluster.zk.get(path: "/consumers/#{name}/offsets/#{partition.topic.name}/#{partition.id}") case result.fetch(:rc) when Zookeeper::Constants::ZOK; result.fetch(:data).to_i when Zookeeper::Constants::ZNONODE; nil else raise Kazoo::Error, "Failed to retrieve offset for partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}" end end |
#watch_instances(&block) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/kazoo/consumergroup.rb', line 31 def watch_instances(&block) cb = Zookeeper::Callbacks::WatcherCallback.create(&block) result = cluster.zk.get_children(path: "/consumers/#{name}/ids", watcher: cb) if result.fetch(:rc) != Zookeeper::Constants::ZOK raise Kazoo::Error, "Failed to watch instances. Error code result[:rc]" end instances = result.fetch(:children).map { |id| Instance.new(self, id: id) } [instances, cb] end |
#watch_partition_claim(partition, &block) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/kazoo/consumergroup.rb', line 44 def watch_partition_claim(partition, &block) cb = Zookeeper::Callbacks::WatcherCallback.create(&block) result = cluster.zk.get(path: "/consumers/#{name}/owners/#{partition.topic.name}/#{partition.id}", watcher: cb) case result.fetch(:rc) when Zookeeper::Constants::ZNONODE # Nobody is claiming this partition yet [nil, nil] when Zookeeper::Constants::ZOK [Kazoo::Consumergroup::Instance.new(self, id: result.fetch(:data)), cb] else raise Kazoo::Error, "Failed set watch for partition claim of #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}" end end |