Class: Datadog::Core::Telemetry::Worker

Inherits:
Object
  • Object
show all
Includes:
Workers::Polling, Workers::Queue
Defined in:
lib/datadog/core/telemetry/worker.rb

Overview

Accumulates events and sends them to the API at a regular interval, including heartbeat event.

Constant Summary collapse

DEFAULT_BUFFER_MAX_SIZE =
1000
APP_STARTED_EVENT_RETRIES =
10
TELEMETRY_STARTED_ONCE =
Utils::OnlyOnceSuccessful.new(APP_STARTED_EVENT_RETRIES)

Constants included from Workers::Polling

Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT

Instance Attribute Summary collapse

Attributes included from Workers::Queue

#buffer

Instance Method Summary collapse

Methods included from Workers::Polling

#enabled=, #enabled?, included

Methods included from Workers::Queue

included

Constructor Details

#initialize(heartbeat_interval_seconds:, metrics_aggregation_interval_seconds:, emitter:, metrics_manager:, dependency_collection:, logger:, enabled: true, shutdown_timeout: Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT, buffer_size: DEFAULT_BUFFER_MAX_SIZE) ⇒ Worker

Returns a new instance of Worker.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/datadog/core/telemetry/worker.rb', line 22

def initialize(
  heartbeat_interval_seconds:,
  metrics_aggregation_interval_seconds:,
  emitter:,
  metrics_manager:,
  dependency_collection:,
  logger:,
  enabled: true,
  shutdown_timeout: Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT,
  buffer_size: DEFAULT_BUFFER_MAX_SIZE
)
  @emitter = emitter
  @metrics_manager = metrics_manager
  @dependency_collection = dependency_collection
  @logger = logger

  @ticks_per_heartbeat = (heartbeat_interval_seconds / metrics_aggregation_interval_seconds).to_i
  @current_ticks = 0

  # Workers::Polling settings
  self.enabled = enabled
  # Workers::IntervalLoop settings
  self.loop_base_interval = metrics_aggregation_interval_seconds
  self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_STOP

  @shutdown_timeout = shutdown_timeout
  @buffer_size = buffer_size

  self.buffer = buffer_klass.new(@buffer_size)
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



53
54
55
# File 'lib/datadog/core/telemetry/worker.rb', line 53

def logger
  @logger
end

Instance Method Details

#enqueue(event) ⇒ Object

Returns true if event was enqueued, nil if not. While returning false may seem more reasonable, the only reason for not enqueueing event (presently) is that telemetry is disabled altogether, and in this case other methods return nil.



77
78
79
80
81
82
# File 'lib/datadog/core/telemetry/worker.rb', line 77

def enqueue(event)
  return if !enabled? || forked?

  buffer.push(event)
  true
end

#failed_to_start?Boolean

Returns:

  • (Boolean)


88
89
90
# File 'lib/datadog/core/telemetry/worker.rb', line 88

def failed_to_start?
  TELEMETRY_STARTED_ONCE.failed?
end

#flushObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Wait for the worker to send out all events that have already been queued, up to 15 seconds. Returns whether all events have been flushed.



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/datadog/core/telemetry/worker.rb', line 97

def flush
  return true unless enabled? || !run_loop?

  started = Utils::Time.get_time
  loop do
    # The AppStarted event is triggered by the worker itself,
    # from the worker thread. As such the main thread has no way
    # to delay itself until that event is queued and we need some
    # way to wait until that event is sent out to assert on it in
    # the test suite. Check the run once flag which *should*
    # indicate the event has been queued (at which point our queue
    # depth check should waint until it's sent).
    # This is still a hack because the flag can be overridden
    # either way with or without the event being sent out.
    # Note that if the AppStarted sending fails, this check
    # will return false and flushing will be blocked until the
    # 15 second timeout.
    # Note that the first wait interval between telemetry event
    # sending is 10 seconds, the timeout needs to be strictly
    # greater than that.
    return true if buffer.empty? && !in_iteration? && TELEMETRY_STARTED_ONCE.success?

    sleep 0.5

    return false if Utils::Time.get_time - started > 15
  end
end

#sent_started_event?Boolean

Returns:

  • (Boolean)


84
85
86
# File 'lib/datadog/core/telemetry/worker.rb', line 84

def sent_started_event?
  TELEMETRY_STARTED_ONCE.success?
end

#startObject

Returns true if worker thread is successfully started, false if worker thread was not started but telemetry is enabled, nil if telemetry is disabled.



58
59
60
61
62
63
64
65
# File 'lib/datadog/core/telemetry/worker.rb', line 58

def start
  return if !enabled? || forked?

  # starts async worker
  # perform should return true if thread was actually started,
  # false otherwise
  perform
end

#stop(force_stop = false, timeout = @shutdown_timeout) ⇒ Object



67
68
69
70
71
# File 'lib/datadog/core/telemetry/worker.rb', line 67

def stop(force_stop = false, timeout = @shutdown_timeout)
  buffer.close if running?

  super
end