Class: KafkaRest::ConsumerStream

Inherits:
Object
  • Object
show all
Includes:
EventEmitter
Defined in:
lib/kafka_rest/consumer_stream.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from EventEmitter

#emit, #on

Constructor Details

#initialize(instance, topic) ⇒ ConsumerStream

Returns a new instance of ConsumerStream.



9
10
11
12
13
14
# File 'lib/kafka_rest/consumer_stream.rb', line 9

def initialize(instance, topic)
  @client = instance.client
  @instance = instance
  @topic = topic
  @active = true
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



7
8
9
# File 'lib/kafka_rest/consumer_stream.rb', line 7

def client
  @client
end

#instanceObject (readonly)

Returns the value of attribute instance.



7
8
9
# File 'lib/kafka_rest/consumer_stream.rb', line 7

def instance
  @instance
end

#topicObject (readonly)

Returns the value of attribute topic.



7
8
9
# File 'lib/kafka_rest/consumer_stream.rb', line 7

def topic
  @topic
end

Instance Method Details

#active?Boolean

Returns:

  • (Boolean)


37
38
39
# File 'lib/kafka_rest/consumer_stream.rb', line 37

def active?
  !!@active
end

#readObject



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/kafka_rest/consumer_stream.rb', line 16

def read
  loop do
    client.request(consume_path) do |res|
      messages = JSON.parse(res.body.to_s)
      break if messages.empty?

      if res.code.to_i > 400
        emit(:error, messages)
      else
        emit(:data, messages.map(&decode))
      end
    end

    unless active?
      emit(:end)
      @cleanup.call if @cleanup.is_a? Proc
      break # out of read loop
    end
  end
end

#shutdown!(&block) ⇒ Object



41
42
43
44
# File 'lib/kafka_rest/consumer_stream.rb', line 41

def shutdown!(&block)
  @active = false
  @cleanup = block if block_given?
end