Class: Datadog::DI::ProbeNotifierWorker Private
- Inherits:
-
Object
- Object
- Datadog::DI::ProbeNotifierWorker
- Defined in:
- lib/datadog/di/probe_notifier_worker.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Background worker thread for sending probe statuses and snapshots to the backend (via the agent).
The loop inside the worker rescues all exceptions to prevent termination due to unhandled exceptions raised by any downstream code. This includes communication and protocol errors when sending the events to the agent.
The worker groups the data to send into batches. The goal is to perform no more than one network operation per event type per second. There is also a limit on the length of the sending queue to prevent it from growing without bounds if upstream code generates an enormous number of events for some reason.
Wake-up events are used (via ConditionVariable) to keep the thread asleep if there is no work to be done.
Instance Attribute Summary collapse
- #agent_settings ⇒ Object readonly private
- #logger ⇒ Object readonly private
- #probe_notification_builder ⇒ Object readonly private
- #probe_repository ⇒ Object readonly private
- #settings ⇒ Object readonly private
- #telemetry ⇒ Object readonly private
Instance Method Summary collapse
-
#flush ⇒ Object
private
Waits for background thread to send pending notifications.
-
#initialize(settings, logger, agent_settings:, probe_repository:, probe_notification_builder:, telemetry: nil) ⇒ ProbeNotifierWorker
constructor
private
A new instance of ProbeNotifierWorker.
-
#start ⇒ void
private
Starts the background worker thread.
-
#stop(timeout = 1) ⇒ Object
private
Stops the background thread.
Constructor Details
#initialize(settings, logger, agent_settings:, probe_repository:, probe_notification_builder:, telemetry: nil) ⇒ ProbeNotifierWorker
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of ProbeNotifierWorker.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/datadog/di/probe_notifier_worker.rb', line 30 def initialize(settings, logger, agent_settings:, probe_repository:, probe_notification_builder:, telemetry: nil) @settings = settings @telemetry = telemetry @status_queue = [] @snapshot_queue = [] @agent_settings = agent_settings @logger = logger @lock = Mutex.new @wake = Core::Semaphore.new @io_in_progress = false @sleep_remaining = nil @wake_scheduled = false @thread = nil @pid = nil @flush = 0 @probe_repository = probe_repository @probe_notification_builder = probe_notification_builder end |
Instance Attribute Details
#agent_settings ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
53 54 55 |
# File 'lib/datadog/di/probe_notifier_worker.rb', line 53 def agent_settings @agent_settings end |
#logger ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
51 52 53 |
# File 'lib/datadog/di/probe_notifier_worker.rb', line 51 def logger @logger end |
#probe_notification_builder ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
55 56 57 |
# File 'lib/datadog/di/probe_notifier_worker.rb', line 55 def probe_notification_builder @probe_notification_builder end |
#probe_repository ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
54 55 56 |
# File 'lib/datadog/di/probe_notifier_worker.rb', line 54 def probe_repository @probe_repository end |
#settings ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
50 51 52 |
# File 'lib/datadog/di/probe_notifier_worker.rb', line 50 def settings @settings end |
#telemetry ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
52 53 54 |
# File 'lib/datadog/di/probe_notifier_worker.rb', line 52 def telemetry @telemetry end |
Instance Method Details
#flush ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Waits for background thread to send pending notifications.
This method waits for the notification queue to become empty rather than for a particular set of notifications to be sent out, therefore, it should only be called when there is no parallel activity (in another thread) that causes more notifications to be generated.
This method is used by the test suite to wait until notifications have been sent out, and could be used for graceful stopping of the worker thread.
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/datadog/di/probe_notifier_worker.rb', line 137 def flush @lock.synchronize do @flush += 1 end begin loop do if @thread.nil? || !@thread.alive? return end io_in_progress, queues_empty = @lock.synchronize do [io_in_progress?, status_queue.empty? && snapshot_queue.empty?] end if io_in_progress # If we just call Thread.pass we could be in a busy loop - # add a sleep. sleep 0.25 next elsif queues_empty break else wake.signal sleep 0.25 next end end ensure @lock.synchronize do @flush -= 1 end end end |
#start ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Starts the background worker thread.
The thread batches and sends probe statuses and snapshots to the agent. If the process forks, the thread is automatically restarted in the child.
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/datadog/di/probe_notifier_worker.rb', line 63 def start return if @thread && @pid == Process.pid logger.trace { "di: starting probe notifier: pid #{$$}" } @thread = Thread.new do loop do # TODO If stop is requested, we stop immediately without # flushing the submissions. Should we send pending submissions # and then quit? break if @stop_requested # If a flush was requested, send immediately and do not # wait for the cooldown period. if @lock.synchronize { @flush } == 0 sleep_remaining = @lock.synchronize do if sleep_remaining && sleep_remaining > 0 # Recalculate how much sleep time is remaining, then sleep that long. set_sleep_remaining else 0 end end if sleep_remaining > 0 # Do not need to update @wake_scheduled here because # wake-up is already scheduled for the earliest possible time. wake.wait(sleep_remaining) next end end begin more = maybe_send rescue => exc raise if settings.dynamic_instrumentation.internal.propagate_all_exceptions logger.debug { "di: error in probe notifier worker: #{exc.class}: #{exc} (at #{exc.backtrace.first})" } telemetry&.report(exc, description: "Error in probe notifier worker") end @lock.synchronize do @wake_scheduled = more end wake.wait(more ? min_send_interval : nil) end end @pid = Process.pid end |
#stop(timeout = 1) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Stops the background thread.
Attempts a graceful stop with the specified timeout, then falls back to killing the thread using Thread#kill.
114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/datadog/di/probe_notifier_worker.rb', line 114 def stop(timeout = 1) @stop_requested = true logger.trace { "di: stopping probe notifier: pid #{$$}" } wake.signal if thread unless thread.join(timeout) thread.kill end @thread = nil end end |