Module: Datadog::Tracing::Contrib::Kafka::Instrumentation::Producer::InstanceMethods
- Defined in:
- lib/datadog/tracing/contrib/kafka/instrumentation/producer.rb
Instance Method Summary collapse
Instance Method Details
#deliver_messages(**kwargs) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/datadog/tracing/contrib/kafka/instrumentation/producer.rb', line 15 def (**kwargs) if Datadog::DataStreams.enabled? begin = instance_variable_get(:@pending_message_queue) if && !.empty? .each do || .headers ||= {} Datadog::DataStreams.set_produce_checkpoint( type: 'kafka', destination: .topic, auto_instrumentation: true ) do |key, value| .headers[key] = value end end end rescue => e Datadog.logger.debug("Error setting DSM checkpoint: #{e.class}: #{e}") end end super end |
#send_messages(messages, **kwargs) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/datadog/tracing/contrib/kafka/instrumentation/producer.rb', line 40 def (, **kwargs) if Datadog::DataStreams.enabled? begin .each do || [:headers] ||= {} Datadog::DataStreams.set_produce_checkpoint( type: 'kafka', destination: [:topic], auto_instrumentation: true ) do |key, value| [:headers][key] = value end end rescue => e Datadog.logger.debug("Error setting DSM checkpoint: #{e.class}: #{e}") end end super end |