Class: Kazoo::Consumergroup::Instance
- Inherits:
-
Object
- Object
- Kazoo::Consumergroup::Instance
- Defined in:
- lib/kazoo/consumergroup.rb
Instance Attribute Summary collapse
-
#group ⇒ Object
readonly
Returns the value of attribute group.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
Class Method Summary collapse
Instance Method Summary collapse
- #claim_partition(partition) ⇒ Object
- #deregister ⇒ Object
- #eql?(other) ⇒ Boolean (also: #==)
- #hash ⇒ Object
-
#initialize(group, id: nil) ⇒ Instance
constructor
A new instance of Instance.
- #inspect ⇒ Object
- #register(subscription) ⇒ Object
- #registered? ⇒ Boolean
- #release_partition(partition) ⇒ Object
Constructor Details
#initialize(group, id: nil) ⇒ Instance
Returns a new instance of Instance.
129 130 131 132 |
# File 'lib/kazoo/consumergroup.rb', line 129 def initialize(group, id: nil) @group = group @id = id || self.class.generate_id end |
Instance Attribute Details
#group ⇒ Object (readonly)
Returns the value of attribute group.
127 128 129 |
# File 'lib/kazoo/consumergroup.rb', line 127 def group @group end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
127 128 129 |
# File 'lib/kazoo/consumergroup.rb', line 127 def id @id end |
Class Method Details
.generate_id ⇒ Object
123 124 125 |
# File 'lib/kazoo/consumergroup.rb', line 123 def self.generate_id "#{Socket.gethostname}:#{SecureRandom.uuid}" end |
Instance Method Details
#claim_partition(partition) ⇒ Object
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/kazoo/consumergroup.rb', line 170 def claim_partition(partition) result = cluster.zk.create( path: "/consumers/#{group.name}/owners/#{partition.topic.name}/#{partition.id}", ephemeral: true, data: id, ) case result.fetch(:rc) when Zookeeper::Constants::ZOK return true when Zookeeper::Constants::ZNODEEXISTS raise Kazoo::PartitionAlreadyClaimed, "Partition #{partition.topic.name}/#{partition.id} is already claimed!" else raise Kazoo::Error, "Failed to claim partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}" end end |
#deregister ⇒ Object
166 167 168 |
# File 'lib/kazoo/consumergroup.rb', line 166 def deregister cluster.zk.delete(path: "/consumers/#{group.name}/ids/#{id}") end |
#eql?(other) ⇒ Boolean Also known as: ==
202 203 204 |
# File 'lib/kazoo/consumergroup.rb', line 202 def eql?(other) other.kind_of?(Kazoo::Consumergroup::Instance) && group == other.group && id == other.id end |
#hash ⇒ Object
198 199 200 |
# File 'lib/kazoo/consumergroup.rb', line 198 def hash [group, id].hash end |
#inspect ⇒ Object
194 195 196 |
# File 'lib/kazoo/consumergroup.rb', line 194 def inspect "#<Kazoo::Consumergroup::Instance group=#{group.name} id=#{id}>" end |
#register(subscription) ⇒ Object
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/kazoo/consumergroup.rb', line 139 def register(subscription) result = cluster.zk.create( path: "/consumers/#{group.name}/ids/#{id}", ephemeral: true, data: JSON.generate({ version: 1, timestamp: Time.now.to_i, pattern: "static", subscription: Hash[*subscription.flat_map { |topic| [topic.name, 1] } ] }) ) if result.fetch(:rc) != Zookeeper::Constants::ZOK raise Kazoo::ConsumerInstanceRegistrationFailed, "Failed to register instance #{id} for consumer group #{group.name}! Error code: #{result.fetch(:rc)}" end subscription.each do |topic| stat = cluster.zk.stat(path: "/consumers/#{group.name}/owners/#{topic.name}") unless stat.fetch(:stat).exists? result = cluster.zk.create(path: "/consumers/#{group.name}/owners/#{topic.name}") if result.fetch(:rc) != Zookeeper::Constants::ZOK raise Kazoo::ConsumerInstanceRegistrationFailed, "Failed to register subscription of #{topic.name} for consumer group #{group.name}! Error code: #{result.fetch(:rc)}" end end end end |
#registered? ⇒ Boolean
134 135 136 137 |
# File 'lib/kazoo/consumergroup.rb', line 134 def registered? stat = cluster.zk.stat(path: "/consumers/#{group.name}/ids/#{id}") stat.fetch(:stat).exists? end |
#release_partition(partition) ⇒ Object
187 188 189 190 191 192 |
# File 'lib/kazoo/consumergroup.rb', line 187 def release_partition(partition) result = cluster.zk.delete(path: "/consumers/#{group.name}/owners/#{partition.topic.name}/#{partition.id}") if result.fetch(:rc) != Zookeeper::Constants::ZOK raise Kazoo::Error, "Failed to release partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}" end end |