Class: KafkaRest::ConsumerInstance
- Inherits:
-
Object
- Object
- KafkaRest::ConsumerInstance
- Defined in:
- lib/kafka_rest/consumer_instance.rb
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#consumer ⇒ Object
readonly
Returns the value of attribute consumer.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#raw ⇒ Object
readonly
Returns the value of attribute raw.
-
#streams ⇒ Object
readonly
Returns the value of attribute streams.
-
#uri ⇒ Object
readonly
Returns the value of attribute uri.
Instance Method Summary collapse
- #active? ⇒ Boolean
-
#initialize(consumer, raw) ⇒ ConsumerInstance
constructor
A new instance of ConsumerInstance.
- #shutdown! ⇒ Object
- #start! ⇒ Object
- #subscribe(topic) {|stream| ... } ⇒ Object
Constructor Details
#initialize(consumer, raw) ⇒ ConsumerInstance
Returns a new instance of ConsumerInstance.
5 6 7 8 9 10 11 12 13 |
# File 'lib/kafka_rest/consumer_instance.rb', line 5 def initialize(consumer, raw) @client = consumer.client @consumer = consumer @raw = raw @id = raw.fetch('instance_id') { fail 'consumer response did not contain instance_id' } @uri = raw.fetch('base_uri') { fail 'consumer response did not contain base_uri' } @streams = [] @active = true end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
3 4 5 |
# File 'lib/kafka_rest/consumer_instance.rb', line 3 def client @client end |
#consumer ⇒ Object (readonly)
Returns the value of attribute consumer.
3 4 5 |
# File 'lib/kafka_rest/consumer_instance.rb', line 3 def consumer @consumer end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
3 4 5 |
# File 'lib/kafka_rest/consumer_instance.rb', line 3 def id @id end |
#raw ⇒ Object (readonly)
Returns the value of attribute raw.
3 4 5 |
# File 'lib/kafka_rest/consumer_instance.rb', line 3 def raw @raw end |
#streams ⇒ Object (readonly)
Returns the value of attribute streams.
3 4 5 |
# File 'lib/kafka_rest/consumer_instance.rb', line 3 def streams @streams end |
#uri ⇒ Object (readonly)
Returns the value of attribute uri.
3 4 5 |
# File 'lib/kafka_rest/consumer_instance.rb', line 3 def uri @uri end |
Instance Method Details
#active? ⇒ Boolean
33 34 35 |
# File 'lib/kafka_rest/consumer_instance.rb', line 33 def active? !!@active end |
#shutdown! ⇒ Object
27 28 29 30 31 |
# File 'lib/kafka_rest/consumer_instance.rb', line 27 def shutdown! @streams.each(&:shutdown!) client.request(uri, verb: Net::HTTP::Delete) @active = false end |
#start! ⇒ Object
21 22 23 24 25 |
# File 'lib/kafka_rest/consumer_instance.rb', line 21 def start! threads = [] @streams.each { |stream| threads << Thread.new { stream.read } } threads.each(&:join) end |
#subscribe(topic) {|stream| ... } ⇒ Object
15 16 17 18 19 |
# File 'lib/kafka_rest/consumer_instance.rb', line 15 def subscribe(topic) stream = ConsumerStream.new(self, topic) @streams << stream yield stream if block_given? end |