Module: Datadog::Tracing::Contrib::Karafka::Monitor
- Defined in:
- lib/datadog/tracing/contrib/karafka/monitor.rb
Overview
Custom monitor for Karafka. Creating a custom monitor, instead of subscribing to an event (e.g. ‘Karafka.monitor.subscribe ’worker.processed’‘), is required because event subscriptions cannot wrap the event execution (`yield`).
Constant Summary collapse
- TRACEABLE_EVENTS =
%w[ worker.processed ].freeze
Instance Method Summary collapse
Instance Method Details
permalink #instrument(event_id, payload = EMPTY_HASH, &block) ⇒ Object
[View source]
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/datadog/tracing/contrib/karafka/monitor.rb', line 18 def instrument(event_id, payload = EMPTY_HASH, &block) return super unless TRACEABLE_EVENTS.include?(event_id) Datadog::Tracing.trace(Ext::SPAN_WORKER_PROCESS) do |span| job = payload[:job] job_type = fetch_job_type(job.class) consumer = job.executor.topic.consumer action = case job_type when 'Periodic', 'PeriodicNonBlocking' 'tick' when 'Shutdown' 'shutdown' when 'Revoked', 'RevokedNonBlocking' 'revoked' when 'Idle' 'idle' when 'Eofed', 'EofedNonBlocking' 'eofed' else 'consume' end span.resource = "#{consumer}##{action}" if action == 'consume' span.set_tag(Ext::TAG_MESSAGE_COUNT, job..count) span.set_tag(Ext::TAG_PARTITION, job.executor.partition) span.set_tag(Ext::TAG_OFFSET, job..first..offset) span.set_tag(Ext::TAG_CONSUMER, consumer) span.set_tag(Contrib::Ext::Messaging::TAG_DESTINATION, job.executor.topic.name) span.set_tag(Contrib::Ext::Messaging::TAG_SYSTEM, Ext::TAG_SYSTEM) end super end end |