Class: Output::KafkaPlugin

Inherits:
OutputPlugin show all
Defined in:
lib/fileminer/output/kafka.rb

Instance Method Summary collapse

Methods inherited from OutputPlugin

#batch?

Constructor Details

#initialize(options) ⇒ KafkaPlugin

Create a kafka output plugin instance

Parameters:

Options Hash (options):

  • :brokers (Array) — default: ['localhost:9092']
  • :client_id (String) — default: 'fileminer'
  • :topic (String) — default: 'fileminer'
  • :mode (Symbol) — default: :sync

    :sync or :async

  • :auto_delivery (Symbol) — default: :disabled

    :disabled or :enabled

  • :delivery_conf (Hash)


19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/fileminer/output/kafka.rb', line 19

def initialize(options)
  brokers = options[:brokers] || ['localhost:9092']
  client_id = options[:client_id] || 'fileminer'
  @topic = options[:topic] || 'fileminer'
  @kafka = Kafka.new(brokers, client_id: client_id)
  case @mode = options[:mode]
  when :sync
    @producer = @kafka.producer
  when :async
    case @auto_delivery = options[:auto_delivery]
    when :disabled
      @producer = @kafka.async_producer
    when :enabled
      @producer = @kafka.async_producer options[:delivery_conf]
    else
      raise "invalid value #@auto_delivery of auto_delivery"
    end
  else
    raise "unsupported mode #@mode"
  end
end

Instance Method Details

#closeObject

close the kafka producer



55
56
57
58
# File 'lib/fileminer/output/kafka.rb', line 55

def close
  @producer.shutdown
  @kafka.close
end

#send_all(lines) { ... } ⇒ Object

Send all lines to kafka using producer API

Parameters:

  • lines (Array)

Yields:

  • a listener to be called after all lines just be delivered



45
46
47
48
49
50
51
52
# File 'lib/fileminer/output/kafka.rb', line 45

def send_all(lines, &listener)
  lines.each do |line|
    message = line.to_json
    @producer.produce(message, topic: @topic)
  end
  @producer.deliver_messages unless @mode == :async and @auto_delivery == :enabled
  listener.call
end