Class: LogStash::Outputs::Kafka
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::Kafka
- Includes:
- PluginMixins::Kafka::Common
- Defined in:
- lib/logstash/outputs/kafka.rb
Overview
Write events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on the broker.
Here’s a compatibility matrix that shows the Kafka client versions that are compatible with each combination of Logstash and the Kafka output plugin:
- options=“header”
-
|========================================================== |Kafka Client Version |Logstash Version |Plugin Version |Why? |0.8 |2.0.0 - 2.x.x |<3.0.0 |Legacy, 0.8 is still popular |0.9 |2.0.0 - 2.3.x | 3.x.x |Works with the old Ruby Event API (‘event[’price’] = 10`) |0.9 |2.4.x - 5.x.x | 4.x.x |Works with the new getter/setter APIs (‘event.set(’[price]‘, 10)`) |0.10.0.x |2.4.x - 5.x.x | 5.x.x |Not compatible with the <= 0.9 broker |0.10.1.x |2.4.x - 5.x.x | 6.x.x | |==========================================================
NOTE: We recommended that you use matching Kafka client and broker versions. During upgrades, you should upgrade brokers before clients because brokers target backwards compatibility. For example, the 0.9 broker is compatible with both the 0.8 consumer and 0.9 consumer APIs, but not the other way around.
This output supports connecting to Kafka over:
-
SSL (requires plugin version 3.0.0 or later)
-
Kerberos SASL (requires plugin version 5.1.0 or later)
By default security is disabled but can be turned on as needed.
The only required configuration is the topic_id. The default codec is plain, so events will be persisted on the broker in plain format. Logstash will encode your messages with not only the message but also with a timestamp and hostname. If you do not want anything but your message passing through, you should make the output configuration something like:
- source,ruby
-
output
kafka { codec => plain { format => "%{message" } topic_id => "mytopic" }
}
For more information see kafka.apache.org/documentation.html#theproducer
Kafka producer configuration: kafka.apache.org/documentation.html#newproducerconfigs
-
Instance Method Summary collapse
- #close ⇒ Object
- #multi_receive(events) ⇒ Object
- #prepare(record) ⇒ Object
- #register ⇒ Object
- #retrying_send(batch) ⇒ Object
Methods included from PluginMixins::Kafka::Common
included, #reassign_dns_lookup, #set_sasl_config, #set_trustore_keystore_config
Instance Method Details
#close ⇒ Object
329 330 331 |
# File 'lib/logstash/outputs/kafka.rb', line 329 def close @producer.close end |
#multi_receive(events) ⇒ Object
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/logstash/outputs/kafka.rb', line 238 def multi_receive(events) t = Thread.current if !@thread_batch_map.include?(t) @thread_batch_map[t] = java.util.ArrayList.new(events.size) end events.each do |event| @codec.encode(event) end batch = @thread_batch_map[t] if batch.any? (batch) batch.clear end end |
#prepare(record) ⇒ Object
233 234 235 236 |
# File 'lib/logstash/outputs/kafka.rb', line 233 def prepare(record) # This output is threadsafe, so we need to keep a batch per thread. @thread_batch_map[Thread.current].add(record) end |
#register ⇒ Object
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/logstash/outputs/kafka.rb', line 201 def register @thread_batch_map = Concurrent::Hash.new if !@retries.nil? if @retries < 0 raise LogStash::ConfigurationError, "A negative retry count (#{@retries}) is not valid. Must be a value >= 0" end logger.warn("Kafka output is configured with finite retry. This instructs Logstash to LOSE DATA after a set number of send attempts fails. If you do not want to lose data if Kafka is down, then you must remove the retry setting.", :retries => @retries) end reassign_dns_lookup if value_serializer == 'org.apache.kafka.common.serialization.StringSerializer' @codec.on_event do |event, data| write_to_kafka(event, data) end elsif value_serializer == 'org.apache.kafka.common.serialization.ByteArraySerializer' @codec.on_event do |event, data| write_to_kafka(event, data.to_java_bytes) end else raise LogStash::ConfigurationError, "'value_serializer' only supports org.apache.kafka.common.serialization.ByteArraySerializer and org.apache.kafka.common.serialization.StringSerializer" end .each do |key, value| if !key.is_a? String raise LogStash::ConfigurationError, "'message_headers' contains a key that is not a string!" end end @producer = create_producer end |
#retrying_send(batch) ⇒ Object
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 |
# File 'lib/logstash/outputs/kafka.rb', line 255 def (batch) remaining = @retries while batch.any? unless remaining.nil? if remaining < 0 # TODO(sissel): Offer to DLQ? Then again, if it's a transient fault, # DLQing would make things worse (you dlq data that would be successful # after the fault is repaired) logger.info("Exhausted user-configured retry count when sending to Kafka. Dropping these events.", :max_retries => @retries, :drop_count => batch.count) break end remaining -= 1 end failures = [] futures = batch.collect do |record| begin # send() can throw an exception even before the future is created. @producer.send(record) rescue org.apache.kafka.common.errors.InterruptException, org.apache.kafka.common.errors.RetriableException => e logger.info("producer send failed, will retry sending", :exception => e.class, :message => e.) failures << record nil rescue org.apache.kafka.common.KafkaException => e # This error is not retriable, drop event # TODO: add DLQ support logger.warn("producer send failed, dropping record",:exception => e.class, :message => e., :record_value => record.value) nil end end futures.each_with_index do |future, i| # We cannot skip nils using `futures.compact` because then our index `i` will not align with `batch` unless future.nil? begin future.get rescue java.util.concurrent.ExecutionException => e # TODO(sissel): Add metric to count failures, possibly by exception type. if e.get_cause.is_a? org.apache.kafka.common.errors.RetriableException or e.get_cause.is_a? org.apache.kafka.common.errors.InterruptException logger.info("producer send failed, will retry sending", :exception => e.cause.class, :message => e.cause.) failures << batch[i] elsif e.get_cause.is_a? org.apache.kafka.common.KafkaException # This error is not retriable, drop event # TODO: add DLQ support logger.warn("producer send failed, dropping record", :exception => e.cause.class, :message => e.cause., :record_value => batch[i].value) end end end end # No failures? Cool. Let's move on. break if failures.empty? # Otherwise, retry with any failed transmissions if remaining.nil? || remaining >= 0 delay = @retry_backoff_ms / 1000.0 logger.info("Sending batch to Kafka failed. Will retry after a delay.", :batch_size => batch.size, :failures => failures.size, :sleep => delay) batch = failures sleep(delay) end end end |