Class: LogStash::Inputs::Kafka
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Kafka
- Includes:
- PluginMixins::DeprecationLoggerSupport, PluginMixins::Kafka::AvroSchemaRegistry, PluginMixins::Kafka::Common
- Defined in:
- lib/logstash/inputs/kafka.rb
Overview
This input will read events from a Kafka topic. It uses the 0.10 version of the consumer API provided by Kafka to read messages from the broker.
Here’s a compatibility matrix that shows the Kafka client versions that are compatible with each combination of Logstash and the Kafka input plugin:
- options=“header”
-
|========================================================== |Kafka Client Version |Logstash Version |Plugin Version |Why? |0.8 |2.0.0 - 2.x.x |<3.0.0 |Legacy, 0.8 is still popular |0.9 |2.0.0 - 2.3.x | 3.x.x |Works with the old Ruby Event API (‘event[’price’] = 10`)
|0.9 |2.4.x - 5.x.x | 4.x.x |Works with the new getter/setter APIs (‘event.set(’[price]‘, 10)`) |0.10.0.x |2.4.x - 5.x.x | 5.x.x |Not compatible with the <= 0.9 broker |0.10.1.x |2.4.x - 5.x.x | 6.x.x | |==========================================================
NOTE: We recommended that you use matching Kafka client and broker versions. During upgrades, you should upgrade brokers before clients because brokers target backwards compatibility. For example, the 0.9 broker is compatible with both the 0.8 consumer and 0.9 consumer APIs, but not the other way around.
This input supports connecting to Kafka over:
-
SSL (requires plugin version 3.0.0 or later)
-
Kerberos SASL (requires plugin version 5.1.0 or later)
By default security is disabled but can be turned on as needed.
The Logstash Kafka consumer handles group management and uses the default offset management strategy using Kafka topics.
Logstash instances by default form a single logical group to subscribe to Kafka topics Each Logstash Kafka consumer can run multiple threads to increase read throughput. Alternatively, you could run multiple Logstash instances with the same ‘group_id` to spread the load across physical machines. Messages in a topic will be distributed to all Logstash instances with the same `group_id`.
Ideally you should have as many threads as the number of partitions for a perfect balance – more threads than partitions means that some threads will be idle
For more information see kafka.apache.org/documentation.html#theconsumer
Kafka consumer configuration: kafka.apache.org/documentation.html#consumerconfigs
-
Constant Summary collapse
- DEFAULT_DESERIALIZER_CLASS =
"org.apache.kafka.common.serialization.StringDeserializer"
- METADATA_NONE =
Set[].freeze
- METADATA_BASIC =
Set[:record_props].freeze
- METADATA_EXTENDED =
Set[:record_props, :headers].freeze
- METADATA_DEPRECATION_MAP =
{ 'true' => 'basic', 'false' => 'none' }
Instance Attribute Summary collapse
-
#metadata_mode ⇒ Object
readonly
Returns the value of attribute metadata_mode.
Instance Method Summary collapse
- #do_poll(consumer) ⇒ Object
- #handle_record(record, codec_instance, queue) ⇒ Object
-
#based(onschemaregistrychangethecodecdefault) ⇒ Kafka
constructor
A new instance of Kafka.
- #kafka_consumers ⇒ Object
- #maybe_commit_offset(consumer) ⇒ Object
- #maybe_set_metadata(event, record) ⇒ Object
- #register ⇒ Object
- #run(logstash_queue) ⇒ Object
- #stop ⇒ Object
- #subscribe(consumer) ⇒ Object
- #thread_runner(logstash_queue, consumer, name) ⇒ Object
Methods included from PluginMixins::Kafka::AvroSchemaRegistry
#check_schema_registry_parameters, included, #schema_registry_validation?, #setup_schema_registry_config, #using_kerberos?
Methods included from PluginMixins::Kafka::Common
included, #reassign_dns_lookup, #set_sasl_config, #set_trustore_keystore_config
Constructor Details
#based(onschemaregistrychangethecodecdefault) ⇒ Kafka
Returns a new instance of Kafka.
276 277 278 279 280 281 282 |
# File 'lib/logstash/inputs/kafka.rb', line 276 def initialize(params = {}) unless params.key?('codec') params['codec'] = params.key?('schema_registry_url') ? 'json' : 'plain' end super(params) end |
Instance Attribute Details
#metadata_mode ⇒ Object (readonly)
Returns the value of attribute metadata_mode.
273 274 275 |
# File 'lib/logstash/inputs/kafka.rb', line 273 def @metadata_mode end |
Instance Method Details
#do_poll(consumer) ⇒ Object
361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 |
# File 'lib/logstash/inputs/kafka.rb', line 361 def do_poll(consumer) records = [] begin records = consumer.poll(java.time.Duration.ofMillis(poll_timeout_ms)) rescue org.apache.kafka.common.errors.WakeupException => e logger.debug("Wake up from poll", :kafka_error_message => e) raise e unless stop? rescue org.apache.kafka.common.errors.FencedInstanceIdException => e logger.error("Another consumer with same group.instance.id has connected", :original_error_message => e.) raise e unless stop? rescue => e logger.error("Unable to poll Kafka consumer", :kafka_error_message => e, :cause => e.respond_to?(:getCause) ? e.getCause : nil) Stud.stoppable_sleep(1) { stop? } end records end |
#handle_record(record, codec_instance, queue) ⇒ Object
380 381 382 383 384 385 386 387 |
# File 'lib/logstash/inputs/kafka.rb', line 380 def handle_record(record, codec_instance, queue) # use + since .to_s on nil/boolean returns a frozen string since ruby 2.7 codec_instance.decode(+record.value.to_s) do |event| decorate(event) (event, record) queue << event end end |
#kafka_consumers ⇒ Object
334 335 336 |
# File 'lib/logstash/inputs/kafka.rb', line 334 def kafka_consumers @runner_consumers end |
#maybe_commit_offset(consumer) ⇒ Object
411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 |
# File 'lib/logstash/inputs/kafka.rb', line 411 def maybe_commit_offset(consumer) begin consumer.commitSync if @enable_auto_commit.eql?(false) rescue org.apache.kafka.common.errors.WakeupException => e logger.debug("Wake up from commitSync", :kafka_error_message => e) raise e unless stop? rescue StandardError => e # For transient errors, the commit should be successful after the next set of # polled records has been processed. # But, it might also be worth thinking about adding a configurable retry mechanism logger.error("Unable to commit records", :kafka_error_message => e, :cause => e.respond_to?(:getCause) ? e.getCause() : nil) end end |
#maybe_set_metadata(event, record) ⇒ Object
389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 |
# File 'lib/logstash/inputs/kafka.rb', line 389 def (event, record) if @metadata_mode.include?(:record_props) event.set("[@metadata][kafka][topic]", record.topic) event.set("[@metadata][kafka][consumer_group]", @group_id) event.set("[@metadata][kafka][partition]", record.partition) event.set("[@metadata][kafka][offset]", record.offset) event.set("[@metadata][kafka][key]", record.key) event.set("[@metadata][kafka][timestamp]", record.) end if @metadata_mode.include?(:headers) record.headers .select{|h| header_with_value(h) } .each do |header| s = String.from_java_bytes(header.value) s.force_encoding(Encoding::UTF_8) if s.valid_encoding? event.set("[@metadata][kafka][headers][" + header.key + "]", s) end end end end |
#register ⇒ Object
285 286 287 288 289 290 291 |
# File 'lib/logstash/inputs/kafka.rb', line 285 def register @runner_threads = [] @metadata_mode = (@decorate_events) reassign_dns_lookup @pattern ||= java.util.regex.Pattern.compile(@topics_pattern) unless @topics_pattern.nil? check_schema_registry_parameters end |
#run(logstash_queue) ⇒ Object
316 317 318 319 320 321 322 323 324 325 |
# File 'lib/logstash/inputs/kafka.rb', line 316 def run(logstash_queue) @runner_consumers = consumer_threads.times.map do |i| thread_group_instance_id = consumer_threads > 1 && group_instance_id ? "#{group_instance_id}-#{i}" : group_instance_id subscribe(create_consumer("#{client_id}-#{i}", thread_group_instance_id)) end @runner_threads = @runner_consumers.map.with_index { |consumer, i| thread_runner(logstash_queue, consumer, "kafka-input-worker-#{client_id}-#{i}") } @runner_threads.each(&:start) @runner_threads.each(&:join) end |
#stop ⇒ Object
328 329 330 331 |
# File 'lib/logstash/inputs/kafka.rb', line 328 def stop # if we have consumers, wake them up to unblock our runner threads @runner_consumers && @runner_consumers.each(&:wakeup) end |
#subscribe(consumer) ⇒ Object
338 339 340 341 |
# File 'lib/logstash/inputs/kafka.rb', line 338 def subscribe(consumer) @pattern.nil? ? consumer.subscribe(topics) : consumer.subscribe(@pattern) consumer end |
#thread_runner(logstash_queue, consumer, name) ⇒ Object
343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 |
# File 'lib/logstash/inputs/kafka.rb', line 343 def thread_runner(logstash_queue, consumer, name) java.lang.Thread.new do LogStash::Util::set_thread_name(name) begin codec_instance = @codec.clone until stop? records = do_poll(consumer) unless records.empty? records.each { |record| handle_record(record, codec_instance, logstash_queue) } maybe_commit_offset(consumer) end end ensure consumer.close end end end |