Module: Datadog::Tracing::Contrib::Kafka::Instrumentation::Consumer::InstanceMethods

Defined in:
lib/datadog/tracing/contrib/kafka/instrumentation/consumer.rb

Overview

Instance methods for consumer instrumentation

Instance Method Summary collapse

Instance Method Details

#each_batch(**kwargs, &block) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/datadog/tracing/contrib/kafka/instrumentation/consumer.rb', line 39

def each_batch(**kwargs, &block)
  return super unless Datadog::DataStreams.enabled?

  wrapped_block = proc do |batch|
    Datadog.logger.debug { "Kafka each_batch: DSM enabled for topic #{batch.topic}" }

    begin
      Datadog::DataStreams.set_consume_checkpoint(
        type: 'kafka',
        source: batch.topic,
        auto_instrumentation: true
      )
    rescue => e
      Datadog.logger.debug("Error setting DSM checkpoint: #{e.class}: #{e}")
    end

    yield(batch) if block
  end

  super(**kwargs, &wrapped_block)
end

#each_message(**kwargs, &block) ⇒ Object



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/datadog/tracing/contrib/kafka/instrumentation/consumer.rb', line 16

def each_message(**kwargs, &block)
  return super unless Datadog::DataStreams.enabled?

  wrapped_block = proc do |message|
    Datadog.logger.debug { "Kafka each_message: DSM enabled for topic #{message.topic}" }

    begin
      headers = message.headers || {}
      Datadog::DataStreams.set_consume_checkpoint(
        type: 'kafka',
        source: message.topic,
        auto_instrumentation: true
      ) { |key| headers[key] }
    rescue => e
      Datadog.logger.debug("Error setting DSM checkpoint: #{e.class}: #{e}")
    end

    yield(message) if block
  end

  super(**kwargs, &wrapped_block)
end