Module: Datadog::Tracing::Contrib::Karafka::Monitor

Defined in:
lib/datadog/tracing/contrib/karafka/monitor.rb

Overview

Custom monitor for Karafka. Creating a custom monitor, instead of subscribing to an event (e.g. ‘Karafka.monitor.subscribe ’worker.processed’‘), is required because event subscriptions cannot wrap the event execution (`yield`).

Constant Summary collapse

TRACEABLE_EVENTS =
%w[
  worker.processed
].freeze

Instance Method Summary collapse

Instance Method Details

#instrument(event_id, payload = EMPTY_HASH, &block) ⇒ Object

[View source]

18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/datadog/tracing/contrib/karafka/monitor.rb', line 18

def instrument(event_id, payload = EMPTY_HASH, &block)
  return super unless TRACEABLE_EVENTS.include?(event_id)

  Datadog::Tracing.trace(Ext::SPAN_WORKER_PROCESS) do |span|
    job = payload[:job]
    job_type = fetch_job_type(job.class)
    consumer = job.executor.topic.consumer

    action = case job_type
             when 'Periodic', 'PeriodicNonBlocking'
               'tick'
             when 'Shutdown'
               'shutdown'
             when 'Revoked', 'RevokedNonBlocking'
               'revoked'
             when 'Idle'
               'idle'
             when 'Eofed', 'EofedNonBlocking'
               'eofed'
             else
               'consume'
             end

    span.resource = "#{consumer}##{action}"

    if action == 'consume'
      span.set_tag(Ext::TAG_MESSAGE_COUNT, job.messages.count)
      span.set_tag(Ext::TAG_PARTITION, job.executor.partition)
      span.set_tag(Ext::TAG_OFFSET, job.messages.first..offset)
      span.set_tag(Ext::TAG_CONSUMER, consumer)
      span.set_tag(Contrib::Ext::Messaging::TAG_DESTINATION, job.executor.topic.name)
      span.set_tag(Contrib::Ext::Messaging::TAG_SYSTEM, Ext::TAG_SYSTEM)
    end

    super
  end
end