Class: Datadog::Core::Telemetry::Worker

Inherits:
Object
  • Object
show all
Includes:
Workers::Polling, Workers::Queue
Defined in:
lib/datadog/core/telemetry/worker.rb

Overview

Accumulates events and sends them to the API at a regular interval, including heartbeat event.

Constant Summary collapse

DEFAULT_BUFFER_MAX_SIZE =
1000
APP_STARTED_EVENT_RETRIES =
10

Constants included from Workers::Polling

Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT

Instance Attribute Summary collapse

Attributes included from Workers::Queue

#buffer

Instance Method Summary collapse

Methods included from Workers::Polling

#enabled=, #enabled?, included

Methods included from Workers::Queue

included

Constructor Details

#initialize(heartbeat_interval_seconds:, metrics_aggregation_interval_seconds:, emitter:, metrics_manager:, dependency_collection:, logger:, enabled: true, shutdown_timeout: Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT, buffer_size: DEFAULT_BUFFER_MAX_SIZE) ⇒ Worker

Returns a new instance of Worker.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/datadog/core/telemetry/worker.rb', line 20

def initialize(
  heartbeat_interval_seconds:,
  metrics_aggregation_interval_seconds:,
  emitter:,
  metrics_manager:,
  dependency_collection:,
  logger:,
  enabled: true,
  shutdown_timeout: Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT,
  buffer_size: DEFAULT_BUFFER_MAX_SIZE
)
  @emitter = emitter
  @metrics_manager = metrics_manager
  @dependency_collection = dependency_collection
  @logger = logger

  @ticks_per_heartbeat = (heartbeat_interval_seconds / metrics_aggregation_interval_seconds).to_i
  @current_ticks = 0

  # Workers::Polling settings
  self.enabled = enabled
  # Workers::IntervalLoop settings
  self.loop_base_interval = metrics_aggregation_interval_seconds
  self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_STOP

  @shutdown_timeout = shutdown_timeout
  @buffer_size = buffer_size

  self.buffer = buffer_klass.new(@buffer_size)

  @initial_event_once = Utils::OnlyOnceSuccessful.new(APP_STARTED_EVENT_RETRIES)
end

Instance Attribute Details

#initial_eventObject (readonly)

Returns the value of attribute initial_event.



55
56
57
# File 'lib/datadog/core/telemetry/worker.rb', line 55

def initial_event
  @initial_event
end

#initial_event_onceObject (readonly)

Returns the value of attribute initial_event_once.



54
55
56
# File 'lib/datadog/core/telemetry/worker.rb', line 54

def initial_event_once
  @initial_event_once
end

#loggerObject (readonly)

Returns the value of attribute logger.



53
54
55
# File 'lib/datadog/core/telemetry/worker.rb', line 53

def logger
  @logger
end

Instance Method Details

#enqueue(event) ⇒ Object

Returns true if event was enqueued, nil if not. While returning false may seem more reasonable, the only reason for not enqueueing event (presently) is that telemetry is disabled altogether, and in this case other methods return nil.



81
82
83
84
85
86
# File 'lib/datadog/core/telemetry/worker.rb', line 81

def enqueue(event)
  return if !enabled? || forked?

  buffer.push(event)
  true
end

#failed_initial_event?Boolean

Returns:

  • (Boolean)


92
93
94
# File 'lib/datadog/core/telemetry/worker.rb', line 92

def failed_initial_event?
  initial_event_once.failed?
end

#flushObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Wait for the worker to send out all events that have already been queued, up to 15 seconds. Returns whether all events have been flushed.



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/datadog/core/telemetry/worker.rb', line 105

def flush
  return true unless enabled? || !run_loop?

  started = Utils::Time.get_time
  loop do
    # The AppStarted event is triggered by the worker itself,
    # from the worker thread. As such the main thread has no way
    # to delay itself until that event is queued and we need some
    # way to wait until that event is sent out to assert on it in
    # the test suite. Check the run once flag which *should*
    # indicate the event has been queued (at which point our queue
    # depth check should waint until it's sent).
    # This is still a hack because the flag can be overridden
    # either way with or without the event being sent out.
    # Note that if the AppStarted sending fails, this check
    # will return false and flushing will be blocked until the
    # 15 second timeout.
    # Note that the first wait interval between telemetry event
    # sending is 10 seconds, the timeout needs to be strictly
    # greater than that.
    return true if buffer.empty? && !in_iteration? && sent_initial_event?

    sleep 0.5

    return false if Utils::Time.get_time - started > 15
  end
end

#need_initial_event?Boolean

Returns:

  • (Boolean)


96
97
98
# File 'lib/datadog/core/telemetry/worker.rb', line 96

def need_initial_event?
  !sent_initial_event? && !failed_initial_event?
end

#sent_initial_event?Boolean

Returns:

  • (Boolean)


88
89
90
# File 'lib/datadog/core/telemetry/worker.rb', line 88

def sent_initial_event?
  initial_event_once.success?
end

#start(initial_event) ⇒ Object

Returns true if worker thread is successfully started, false if worker thread was not started but telemetry is enabled, nil if telemetry is disabled.



60
61
62
63
64
65
66
67
68
69
# File 'lib/datadog/core/telemetry/worker.rb', line 60

def start(initial_event)
  return if !enabled? || forked?

  @initial_event = initial_event

  # starts async worker
  # perform should return true if thread was actually started,
  # false otherwise
  perform
end

#stop(force_stop = false, timeout = @shutdown_timeout) ⇒ Object



71
72
73
74
75
# File 'lib/datadog/core/telemetry/worker.rb', line 71

def stop(force_stop = false, timeout = @shutdown_timeout)
  buffer.close if running?

  super
end