Class: Datadog::DI::ProbeNotifierWorker Private

Inherits:
Object
  • Object
show all
Defined in:
lib/datadog/di/probe_notifier_worker.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Background worker thread for sending probe statuses and snapshots to the backend (via the agent).

The loop inside the worker rescues all exceptions to prevent termination due to unhandled exceptions raised by any downstream code. This includes communication and protocol errors when sending the events to the agent.

The worker groups the data to send into batches. The goal is to perform no more than one network operation per event type per second. There is also a limit on the length of the sending queue to prevent it from growing without bounds if upstream code generates an enormous number of events for some reason.

Wake-up events are used (via ConditionVariable) to keep the thread asleep if there is no work to be done.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(settings, logger, agent_settings:, probe_repository:, probe_notification_builder:, telemetry: nil) ⇒ ProbeNotifierWorker

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of ProbeNotifierWorker.

Parameters:

  • probe_repository (ProbeRepository)

    Repository for looking up probes. Used for handling serialization errors (disabling affected probes).

  • probe_notification_builder (ProbeNotificationBuilder)

    Builder for creating status notifications. Used for reporting ERROR status.



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/datadog/di/probe_notifier_worker.rb', line 30

def initialize(settings, logger, agent_settings:,
  probe_repository:, probe_notification_builder:, telemetry: nil)
  @settings = settings
  @telemetry = telemetry
  @status_queue = []
  @snapshot_queue = []
  @agent_settings = agent_settings
  @logger = logger
  @lock = Mutex.new
  @wake = Core::Semaphore.new
  @io_in_progress = false
  @sleep_remaining = nil
  @wake_scheduled = false
  @thread = nil
  @pid = nil
  @flush = 0
  @probe_repository = probe_repository
  @probe_notification_builder = probe_notification_builder
end

Instance Attribute Details

#agent_settingsObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



53
54
55
# File 'lib/datadog/di/probe_notifier_worker.rb', line 53

def agent_settings
  @agent_settings
end

#loggerObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



51
52
53
# File 'lib/datadog/di/probe_notifier_worker.rb', line 51

def logger
  @logger
end

#probe_notification_builderObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



55
56
57
# File 'lib/datadog/di/probe_notifier_worker.rb', line 55

def probe_notification_builder
  @probe_notification_builder
end

#probe_repositoryObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



54
55
56
# File 'lib/datadog/di/probe_notifier_worker.rb', line 54

def probe_repository
  @probe_repository
end

#settingsObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



50
51
52
# File 'lib/datadog/di/probe_notifier_worker.rb', line 50

def settings
  @settings
end

#telemetryObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



52
53
54
# File 'lib/datadog/di/probe_notifier_worker.rb', line 52

def telemetry
  @telemetry
end

Instance Method Details

#flushObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Waits for background thread to send pending notifications.

This method waits for the notification queue to become empty rather than for a particular set of notifications to be sent out, therefore, it should only be called when there is no parallel activity (in another thread) that causes more notifications to be generated.

This method is used by the test suite to wait until notifications have been sent out, and could be used for graceful stopping of the worker thread.



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/datadog/di/probe_notifier_worker.rb', line 137

def flush
  @lock.synchronize do
    @flush += 1
  end
  begin
    loop do
      if @thread.nil? || !@thread.alive?
        return
      end

      io_in_progress, queues_empty = @lock.synchronize do
        [io_in_progress?, status_queue.empty? && snapshot_queue.empty?]
      end

      if io_in_progress
        # If we just call Thread.pass we could be in a busy loop -
        # add a sleep.
        sleep 0.25
        next
      elsif queues_empty
        break
      else
        wake.signal
        sleep 0.25
        next
      end
    end
  ensure
    @lock.synchronize do
      @flush -= 1
    end
  end
end

#startvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Starts the background worker thread.

The thread batches and sends probe statuses and snapshots to the agent. If the process forks, the thread is automatically restarted in the child.



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/datadog/di/probe_notifier_worker.rb', line 63

def start
  return if @thread && @pid == Process.pid
  logger.trace { "di: starting probe notifier: pid #{$$}" }
  @thread = Thread.new do
    loop do
      # TODO If stop is requested, we stop immediately without
      # flushing the submissions. Should we send pending submissions
      # and then quit?
      break if @stop_requested

      # If a flush was requested, send immediately and do not
      # wait for the cooldown period.
      if @lock.synchronize { @flush } == 0
        sleep_remaining = @lock.synchronize do
          if sleep_remaining && sleep_remaining > 0
            # Recalculate how much sleep time is remaining, then sleep that long.
            set_sleep_remaining
          else
            0
          end
        end

        if sleep_remaining > 0
          # Do not need to update @wake_scheduled here because
          # wake-up is already scheduled for the earliest possible time.
          wake.wait(sleep_remaining)
          next
        end
      end

      begin
        more = maybe_send
      rescue => exc
        raise if settings.dynamic_instrumentation.internal.propagate_all_exceptions

        logger.debug { "di: error in probe notifier worker: #{exc.class}: #{exc} (at #{exc.backtrace.first})" }
        telemetry&.report(exc, description: "Error in probe notifier worker")
      end
      @lock.synchronize do
        @wake_scheduled = more
      end
      wake.wait(more ? min_send_interval : nil)
    end
  end
  @pid = Process.pid
end

#stop(timeout = 1) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Stops the background thread.

Attempts a graceful stop with the specified timeout, then falls back to killing the thread using Thread#kill.



114
115
116
117
118
119
120
121
122
123
124
# File 'lib/datadog/di/probe_notifier_worker.rb', line 114

def stop(timeout = 1)
  @stop_requested = true
  logger.trace { "di: stopping probe notifier: pid #{$$}" }
  wake.signal
  if thread
    unless thread.join(timeout)
      thread.kill
    end
    @thread = nil
  end
end