Module: Datadog::Tracing::Contrib::Kafka::Instrumentation::Producer::InstanceMethods

Defined in:
lib/datadog/tracing/contrib/kafka/instrumentation/producer.rb

Instance Method Summary collapse

Instance Method Details

#deliver_messages(**kwargs) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/datadog/tracing/contrib/kafka/instrumentation/producer.rb', line 15

def deliver_messages(**kwargs)
  if Datadog::DataStreams.enabled?
    begin
      pending_messages = instance_variable_get(:@pending_message_queue)

      if pending_messages && !pending_messages.empty?
        pending_messages.each do |message|
          message.headers ||= {}
          Datadog::DataStreams.set_produce_checkpoint(
            type: 'kafka',
            destination: message.topic,
            auto_instrumentation: true
          ) do |key, value|
            message.headers[key] = value
          end
        end
      end
    rescue => e
      Datadog.logger.debug("Error setting DSM checkpoint: #{e.class}: #{e}")
    end
  end

  super
end

#send_messages(messages, **kwargs) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/datadog/tracing/contrib/kafka/instrumentation/producer.rb', line 40

def send_messages(messages, **kwargs)
  if Datadog::DataStreams.enabled?
    begin
      messages.each do |message|
        message[:headers] ||= {}
        Datadog::DataStreams.set_produce_checkpoint(
          type: 'kafka',
          destination: message[:topic],
          auto_instrumentation: true
        ) do |key, value|
          message[:headers][key] = value
        end
      end
    rescue => e
      Datadog.logger.debug("Error setting DSM checkpoint: #{e.class}: #{e}")
    end
  end

  super
end