Class: KafkaRest::ConsumerInstance

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka_rest/consumer_instance.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(consumer, raw) ⇒ ConsumerInstance

Returns a new instance of ConsumerInstance.



5
6
7
8
9
10
11
12
13
# File 'lib/kafka_rest/consumer_instance.rb', line 5

def initialize(consumer, raw)
  @client = consumer.client
  @consumer = consumer
  @raw = raw
  @id = raw.fetch('instance_id') { fail 'consumer response did not contain instance_id' }
  @uri = raw.fetch('base_uri') { fail 'consumer response did not contain base_uri' }
  @streams = []
  @active = true
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



3
4
5
# File 'lib/kafka_rest/consumer_instance.rb', line 3

def client
  @client
end

#consumerObject (readonly)

Returns the value of attribute consumer.



3
4
5
# File 'lib/kafka_rest/consumer_instance.rb', line 3

def consumer
  @consumer
end

#idObject (readonly)

Returns the value of attribute id.



3
4
5
# File 'lib/kafka_rest/consumer_instance.rb', line 3

def id
  @id
end

#rawObject (readonly)

Returns the value of attribute raw.



3
4
5
# File 'lib/kafka_rest/consumer_instance.rb', line 3

def raw
  @raw
end

#streamsObject (readonly)

Returns the value of attribute streams.



3
4
5
# File 'lib/kafka_rest/consumer_instance.rb', line 3

def streams
  @streams
end

#uriObject (readonly)

Returns the value of attribute uri.



3
4
5
# File 'lib/kafka_rest/consumer_instance.rb', line 3

def uri
  @uri
end

Instance Method Details

#active?Boolean

Returns:

  • (Boolean)


33
34
35
# File 'lib/kafka_rest/consumer_instance.rb', line 33

def active?
  !!@active
end

#shutdown!Object



27
28
29
30
31
# File 'lib/kafka_rest/consumer_instance.rb', line 27

def shutdown!
  @streams.each(&:shutdown!)
  client.request(uri, verb: Net::HTTP::Delete)
  @active = false
end

#start!Object



21
22
23
24
25
# File 'lib/kafka_rest/consumer_instance.rb', line 21

def start!
  threads = []
  @streams.each { |stream| threads << Thread.new { stream.read } }
  threads.each(&:join)
end

#subscribe(topic) {|stream| ... } ⇒ Object

Yields:

  • (stream)


15
16
17
18
19
# File 'lib/kafka_rest/consumer_instance.rb', line 15

def subscribe(topic)
  stream = ConsumerStream.new(self, topic)
  @streams << stream
  yield stream if block_given?
end