Class: Kazoo::Consumergroup::Instance

Inherits:
Object
  • Object
show all
Defined in:
lib/kazoo/consumergroup.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(group, id: nil) ⇒ Instance

Returns a new instance of Instance.



129
130
131
132
# File 'lib/kazoo/consumergroup.rb', line 129

def initialize(group, id: nil)
  @group = group
  @id = id || self.class.generate_id
end

Instance Attribute Details

#groupObject (readonly)

Returns the value of attribute group.



127
128
129
# File 'lib/kazoo/consumergroup.rb', line 127

def group
  @group
end

#idObject (readonly)

Returns the value of attribute id.



127
128
129
# File 'lib/kazoo/consumergroup.rb', line 127

def id
  @id
end

Class Method Details

.generate_idObject



123
124
125
# File 'lib/kazoo/consumergroup.rb', line 123

def self.generate_id
  "#{Socket.gethostname}:#{SecureRandom.uuid}"
end

Instance Method Details

#claim_partition(partition) ⇒ Object



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/kazoo/consumergroup.rb', line 170

def claim_partition(partition)
  result = cluster.zk.create(
    path: "/consumers/#{group.name}/owners/#{partition.topic.name}/#{partition.id}",
    ephemeral: true,
    data: id,
  )

  case result.fetch(:rc)
  when Zookeeper::Constants::ZOK
    return true
  when Zookeeper::Constants::ZNODEEXISTS
    raise Kazoo::PartitionAlreadyClaimed, "Partition #{partition.topic.name}/#{partition.id} is already claimed!"
  else
    raise Kazoo::Error, "Failed to claim partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}"
  end
end

#deregisterObject



166
167
168
# File 'lib/kazoo/consumergroup.rb', line 166

def deregister
  cluster.zk.delete(path: "/consumers/#{group.name}/ids/#{id}")
end

#eql?(other) ⇒ Boolean Also known as: ==

Returns:

  • (Boolean)


202
203
204
# File 'lib/kazoo/consumergroup.rb', line 202

def eql?(other)
  other.kind_of?(Kazoo::Consumergroup::Instance) && group == other.group && id == other.id
end

#hashObject



198
199
200
# File 'lib/kazoo/consumergroup.rb', line 198

def hash
  [group, id].hash
end

#inspectObject



194
195
196
# File 'lib/kazoo/consumergroup.rb', line 194

def inspect
  "#<Kazoo::Consumergroup::Instance group=#{group.name} id=#{id}>"
end

#register(subscription) ⇒ Object



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/kazoo/consumergroup.rb', line 139

def register(subscription)
  result = cluster.zk.create(
    path: "/consumers/#{group.name}/ids/#{id}",
    ephemeral: true,
    data: JSON.generate({
      version: 1,
      timestamp: Time.now.to_i,
      pattern: "static",
      subscription: Hash[*subscription.flat_map { |topic| [topic.name, 1] } ]
    })
  )

  if result.fetch(:rc) != Zookeeper::Constants::ZOK
    raise Kazoo::ConsumerInstanceRegistrationFailed, "Failed to register instance #{id} for consumer group #{group.name}! Error code: #{result.fetch(:rc)}"
  end

  subscription.each do |topic|
    stat = cluster.zk.stat(path: "/consumers/#{group.name}/owners/#{topic.name}")
    unless stat.fetch(:stat).exists?
      result = cluster.zk.create(path: "/consumers/#{group.name}/owners/#{topic.name}")
      if result.fetch(:rc) != Zookeeper::Constants::ZOK
        raise Kazoo::ConsumerInstanceRegistrationFailed, "Failed to register subscription of #{topic.name} for consumer group #{group.name}! Error code: #{result.fetch(:rc)}"
      end
    end
  end
end

#registered?Boolean

Returns:

  • (Boolean)


134
135
136
137
# File 'lib/kazoo/consumergroup.rb', line 134

def registered?
  stat = cluster.zk.stat(path: "/consumers/#{group.name}/ids/#{id}")
  stat.fetch(:stat).exists?
end

#release_partition(partition) ⇒ Object



187
188
189
190
191
192
# File 'lib/kazoo/consumergroup.rb', line 187

def release_partition(partition)
  result = cluster.zk.delete(path: "/consumers/#{group.name}/owners/#{partition.topic.name}/#{partition.id}")
  if result.fetch(:rc) != Zookeeper::Constants::ZOK
    raise Kazoo::Error, "Failed to release partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}"
  end
end