Class: Datadog::Core::Telemetry::Worker
- Inherits:
-
Object
- Object
- Datadog::Core::Telemetry::Worker
- Includes:
- Workers::Polling, Workers::Queue
- Defined in:
- lib/datadog/core/telemetry/worker.rb
Overview
Accumulates events and sends them to the API at a regular interval, including heartbeat event.
Constant Summary collapse
- DEFAULT_BUFFER_MAX_SIZE =
1000
- APP_STARTED_EVENT_RETRIES =
10
- TELEMETRY_STARTED_ONCE =
Utils::OnlyOnceSuccessful.new(APP_STARTED_EVENT_RETRIES)
Constants included from Workers::Polling
Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT
Instance Attribute Summary collapse
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
Attributes included from Workers::Queue
Instance Method Summary collapse
-
#enqueue(event) ⇒ Object
Returns true if event was enqueued, nil if not.
- #failed_to_start? ⇒ Boolean
-
#flush ⇒ Object
private
Wait for the worker to send out all events that have already been queued, up to 15 seconds.
-
#initialize(heartbeat_interval_seconds:, metrics_aggregation_interval_seconds:, emitter:, metrics_manager:, dependency_collection:, logger:, enabled: true, shutdown_timeout: Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT, buffer_size: DEFAULT_BUFFER_MAX_SIZE) ⇒ Worker
constructor
A new instance of Worker.
- #sent_started_event? ⇒ Boolean
-
#start ⇒ Object
Returns true if worker thread is successfully started, false if worker thread was not started but telemetry is enabled, nil if telemetry is disabled.
- #stop(force_stop = false, timeout = @shutdown_timeout) ⇒ Object
Methods included from Workers::Polling
#enabled=, #enabled?, included
Methods included from Workers::Queue
Constructor Details
#initialize(heartbeat_interval_seconds:, metrics_aggregation_interval_seconds:, emitter:, metrics_manager:, dependency_collection:, logger:, enabled: true, shutdown_timeout: Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT, buffer_size: DEFAULT_BUFFER_MAX_SIZE) ⇒ Worker
Returns a new instance of Worker.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/datadog/core/telemetry/worker.rb', line 22 def initialize( heartbeat_interval_seconds:, metrics_aggregation_interval_seconds:, emitter:, metrics_manager:, dependency_collection:, logger:, enabled: true, shutdown_timeout: Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT, buffer_size: DEFAULT_BUFFER_MAX_SIZE ) @emitter = emitter @metrics_manager = metrics_manager @dependency_collection = dependency_collection @logger = logger @ticks_per_heartbeat = (heartbeat_interval_seconds / metrics_aggregation_interval_seconds).to_i @current_ticks = 0 # Workers::Polling settings self.enabled = enabled # Workers::IntervalLoop settings self.loop_base_interval = metrics_aggregation_interval_seconds self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_STOP @shutdown_timeout = shutdown_timeout @buffer_size = buffer_size self.buffer = buffer_klass.new(@buffer_size) end |
Instance Attribute Details
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
53 54 55 |
# File 'lib/datadog/core/telemetry/worker.rb', line 53 def logger @logger end |
Instance Method Details
#enqueue(event) ⇒ Object
Returns true if event was enqueued, nil if not. While returning false may seem more reasonable, the only reason for not enqueueing event (presently) is that telemetry is disabled altogether, and in this case other methods return nil.
77 78 79 80 81 82 |
# File 'lib/datadog/core/telemetry/worker.rb', line 77 def enqueue(event) return if !enabled? || forked? buffer.push(event) true end |
#failed_to_start? ⇒ Boolean
88 89 90 |
# File 'lib/datadog/core/telemetry/worker.rb', line 88 def failed_to_start? TELEMETRY_STARTED_ONCE.failed? end |
#flush ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Wait for the worker to send out all events that have already been queued, up to 15 seconds. Returns whether all events have been flushed.
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/datadog/core/telemetry/worker.rb', line 97 def flush return true unless enabled? || !run_loop? started = Utils::Time.get_time loop do # The AppStarted event is triggered by the worker itself, # from the worker thread. As such the main thread has no way # to delay itself until that event is queued and we need some # way to wait until that event is sent out to assert on it in # the test suite. Check the run once flag which *should* # indicate the event has been queued (at which point our queue # depth check should waint until it's sent). # This is still a hack because the flag can be overridden # either way with or without the event being sent out. # Note that if the AppStarted sending fails, this check # will return false and flushing will be blocked until the # 15 second timeout. # Note that the first wait interval between telemetry event # sending is 10 seconds, the timeout needs to be strictly # greater than that. return true if buffer.empty? && !in_iteration? && TELEMETRY_STARTED_ONCE.success? sleep 0.5 return false if Utils::Time.get_time - started > 15 end end |
#sent_started_event? ⇒ Boolean
84 85 86 |
# File 'lib/datadog/core/telemetry/worker.rb', line 84 def sent_started_event? TELEMETRY_STARTED_ONCE.success? end |
#start ⇒ Object
Returns true if worker thread is successfully started, false if worker thread was not started but telemetry is enabled, nil if telemetry is disabled.
58 59 60 61 62 63 64 65 |
# File 'lib/datadog/core/telemetry/worker.rb', line 58 def start return if !enabled? || forked? # starts async worker # perform should return true if thread was actually started, # false otherwise perform end |
#stop(force_stop = false, timeout = @shutdown_timeout) ⇒ Object
67 68 69 70 71 |
# File 'lib/datadog/core/telemetry/worker.rb', line 67 def stop(force_stop = false, timeout = @shutdown_timeout) buffer.close if running? super end |