Class: ScoutApm::BackgroundJobIntegrations::SidekiqMiddleware

Inherits:
Object
  • Object
show all
Defined in:
lib/scout_apm/background_job_integrations/sidekiq.rb

Overview

We insert this middleware into the Sidekiq stack, to capture each job, and time them.

Constant Summary collapse

UNKNOWN_CLASS_PLACEHOLDER =
'UnknownJob'.freeze
ACTIVE_JOB_KLASS =

This name was changed in Sidekiq 8

if sidekiq_version_8?
  'Sidekiq::ActiveJob::Wrapper'.freeze
else
  'ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper'.freeze
end
DELAYED_WRAPPER_KLASS =
'Sidekiq::Extensions::DelayedClass'.freeze

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.sidekiq_version_8?Boolean

Returns:

  • (Boolean)


72
73
74
75
76
77
78
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 72

def self.sidekiq_version_8?
  if defined?(::Sidekiq::VERSION)
    ::Sidekiq::VERSION.to_i >= 8
  else
    false
  end
end

Instance Method Details

#call(_worker, msg, queue) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 52

def call(_worker, msg, queue)
  req = ScoutApm::RequestManager.lookup
  req.annotate_request(:queue_latency => latency(msg))

  begin
    req.start_layer(ScoutApm::Layer.new('Queue', queue))
    started_queue = true
    req.start_layer(ScoutApm::Layer.new('Job', job_class(msg)))
    started_job = true

    yield
  rescue
    req.error!
    raise
  ensure
    req.stop_layer if started_job
    req.stop_layer if started_queue
  end
end

#job_class(msg) ⇒ Object

Capturing the class name is a little tricky, since we need to handle several cases:

  1. ActiveJob, with the class in the key ‘wrapped’

  2. ActiveJob, but the ‘wrapped’ key is wrong (due to YAJL serializing weirdly), find it in args.job_class

  3. DelayedJob wrapper, deserializing using YAML into the real object, which can be introspected

  4. No wrapper, just sidekiq’s class



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 95

def job_class(msg)
  job_class = msg.fetch('class', UNKNOWN_CLASS_PLACEHOLDER)

  if job_class == ACTIVE_JOB_KLASS && msg.key?('wrapped') && msg['wrapped'].is_a?(String)
    begin
      job_class = msg['wrapped'].to_s
    rescue
      ACTIVE_JOB_KLASS
    end
  elsif job_class == ACTIVE_JOB_KLASS && msg.try(:[], 'args').try(:[], 'job_class')
    begin
      job_class = msg['args']['job_class'].to_s
    rescue
      ACTIVE_JOB_KLASS
    end
  elsif job_class == DELAYED_WRAPPER_KLASS
    begin
      # Extract the info out of the wrapper
      yml = msg['args'].first
      deserialized_args = YAML.load(yml)
      klass, method, *rest = deserialized_args

      # If this is an instance of a class, get the class itself
      # Prevents instances from coming through named like "#<Foo:0x007ffd7a9dd8a0>"
      klass = klass.class unless klass.is_a? Module

      job_class = [klass, method].map(&:to_s).join(".")
    rescue
      DELAYED_WRAPPER_KLASS
    end
  end

  job_class
rescue
  UNKNOWN_CLASS_PLACEHOLDER
end

#latency(msg, time = Time.now.to_f) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 132

def latency(msg, time = Time.now.to_f)
  created_at = msg['enqueued_at'] || msg['created_at']
  if created_at
    # Sidekiq 8+ uses milliseconds, older versions use seconds.
    # Do it this way because downstream expects seconds.
    if self.class.sidekiq_version_8?
      # Convert milliseconds to seconds for consistency.
      (time - (created_at.to_f / 1000.0))
    else
      (time - created_at)
    end
  else
    0
  end
rescue
  0
end