Class: Output::KafkaPlugin
- Inherits:
-
OutputPlugin
- Object
- OutputPlugin
- Output::KafkaPlugin
- Defined in:
- lib/fileminer/output/kafka.rb
Instance Method Summary collapse
-
#close ⇒ Object
close the kafka producer.
-
#initialize(options) ⇒ KafkaPlugin
constructor
Create a kafka output plugin instance.
-
#send_all(lines) { ... } ⇒ Object
Send all lines to kafka using producer API.
Methods inherited from OutputPlugin
Constructor Details
#initialize(options) ⇒ KafkaPlugin
Create a kafka output plugin instance
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/fileminer/output/kafka.rb', line 19 def initialize() brokers = [:brokers] || ['localhost:9092'] client_id = [:client_id] || 'fileminer' @topic = [:topic] || 'fileminer' @kafka = Kafka.new(brokers, client_id: client_id) case @mode = [:mode] when :sync @producer = @kafka.producer when :async case @auto_delivery = [:auto_delivery] when :disabled @producer = @kafka.async_producer when :enabled @producer = @kafka.async_producer [:delivery_conf] else raise "invalid value #@auto_delivery of auto_delivery" end else raise "unsupported mode #@mode" end end |
Instance Method Details
#close ⇒ Object
close the kafka producer
55 56 57 58 |
# File 'lib/fileminer/output/kafka.rb', line 55 def close @producer.shutdown @kafka.close end |
#send_all(lines) { ... } ⇒ Object
Send all lines to kafka using producer API
45 46 47 48 49 50 51 52 |
# File 'lib/fileminer/output/kafka.rb', line 45 def send_all(lines, &listener) lines.each do |line| = line.to_json @producer.produce(, topic: @topic) end @producer. unless @mode == :async and @auto_delivery == :enabled listener.call end |