Class: Kafka::Datadog::ProducerSubscriber
- Inherits:
-
StatsdSubscriber
- Object
- ActiveSupport::Subscriber
- StatsdSubscriber
- Kafka::Datadog::ProducerSubscriber
- Defined in:
- lib/kafka/datadog.rb
Instance Method Summary collapse
- #ack_message(event) ⇒ Object
- #buffer_overflow(event) ⇒ Object
- #deliver_messages(event) ⇒ Object
- #produce_message(event) ⇒ Object
- #topic_error(event) ⇒ Object
Instance Method Details
permalink #ack_message(event) ⇒ Object
[View source]
338 339 340 341 342 343 344 345 346 347 348 349 |
# File 'lib/kafka/datadog.rb', line 338 def (event) = { client: event.payload.fetch(:client_id), topic: event.payload.fetch(:topic), } # Number of messages ACK'd for the topic. increment("producer.ack.messages", tags: ) # Histogram of delay between a message being produced and it being ACK'd. histogram("producer.ack.delay", event.payload.fetch(:delay), tags: ) end |
permalink #buffer_overflow(event) ⇒ Object
[View source]
307 308 309 310 311 312 313 314 |
# File 'lib/kafka/datadog.rb', line 307 def buffer_overflow(event) = { client: event.payload.fetch(:client_id), topic: event.payload.fetch(:topic), } increment("producer.produce.errors", tags: ) end |
permalink #deliver_messages(event) ⇒ Object
[View source]
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 |
# File 'lib/kafka/datadog.rb', line 316 def (event) client = event.payload.fetch(:client_id) = event.payload.fetch(:delivered_message_count) attempts = event.payload.fetch(:attempts) = { client: client, } if event.payload.key?(:exception) increment("producer.deliver.errors", tags: ) end timing("producer.deliver.latency", event.duration, tags: ) # Messages delivered to Kafka: count("producer.deliver.messages", , tags: ) # Number of attempts to deliver messages: histogram("producer.deliver.attempts", attempts, tags: ) end |
permalink #produce_message(event) ⇒ Object
[View source]
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 |
# File 'lib/kafka/datadog.rb', line 276 def (event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) = event.payload.fetch(:message_size) buffer_size = event.payload.fetch(:buffer_size) max_buffer_size = event.payload.fetch(:max_buffer_size) buffer_fill_ratio = buffer_size.to_f / max_buffer_size.to_f buffer_fill_percentage = buffer_fill_ratio * 100.0 = { client: client, topic: topic, } # This gets us the write rate. increment("producer.produce.messages", tags: .merge(topic: topic)) # Information about typical/average/95p message size. histogram("producer.produce.message_size", , tags: .merge(topic: topic)) # Aggregate message size. count("producer.produce.message_size.sum", , tags: .merge(topic: topic)) # This gets us the avg/max buffer size per producer. histogram("producer.buffer.size", buffer_size, tags: ) # This gets us the avg/max buffer fill ratio per producer. histogram("producer.buffer.fill_ratio", buffer_fill_ratio, tags: ) histogram("producer.buffer.fill_percentage", buffer_fill_percentage, tags: ) end |
permalink #topic_error(event) ⇒ Object
[View source]
351 352 353 354 355 356 357 358 |
# File 'lib/kafka/datadog.rb', line 351 def topic_error(event) = { client: event.payload.fetch(:client_id), topic: event.payload.fetch(:topic) } increment("producer.ack.errors", tags: ) end |