Module: Datadog::Tracing::Contrib::Kafka::Instrumentation::Consumer::InstanceMethods
- Defined in:
- lib/datadog/tracing/contrib/kafka/instrumentation/consumer.rb
Overview
Instance methods for consumer instrumentation
Instance Method Summary collapse
Instance Method Details
#each_batch(**kwargs, &block) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/datadog/tracing/contrib/kafka/instrumentation/consumer.rb', line 39 def each_batch(**kwargs, &block) return super unless Datadog::DataStreams.enabled? wrapped_block = proc do |batch| Datadog.logger.debug { "Kafka each_batch: DSM enabled for topic #{batch.topic}" } begin Datadog::DataStreams.set_consume_checkpoint( type: 'kafka', source: batch.topic, auto_instrumentation: true ) rescue => e Datadog.logger.debug("Error setting DSM checkpoint: #{e.class}: #{e}") end yield(batch) if block end super(**kwargs, &wrapped_block) end |
#each_message(**kwargs, &block) ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/datadog/tracing/contrib/kafka/instrumentation/consumer.rb', line 16 def (**kwargs, &block) return super unless Datadog::DataStreams.enabled? wrapped_block = proc do || Datadog.logger.debug { "Kafka each_message: DSM enabled for topic #{message.topic}" } begin headers = .headers || {} Datadog::DataStreams.set_consume_checkpoint( type: 'kafka', source: .topic, auto_instrumentation: true ) { |key| headers[key] } rescue => e Datadog.logger.debug("Error setting DSM checkpoint: #{e.class}: #{e}") end yield() if block end super(**kwargs, &wrapped_block) end |