Class: Datadog::Core::Telemetry::Worker

Inherits:
Object
  • Object
show all
Includes:
Workers::Polling, Workers::Queue
Defined in:
lib/datadog/core/telemetry/worker.rb

Overview

Accumulates events and sends them to the API at a regular interval, including heartbeat event.

Constant Summary collapse

DEFAULT_BUFFER_MAX_SIZE =
1000
APP_STARTED_EVENT_RETRIES =
10
TELEMETRY_STARTED_ONCE =
Utils::OnlyOnceSuccessful.new(APP_STARTED_EVENT_RETRIES)

Constants included from Workers::Polling

Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT

Instance Attribute Summary collapse

Attributes included from Workers::Queue

#buffer

Instance Method Summary collapse

Methods included from Workers::Polling

#enabled=, #enabled?, included

Methods included from Workers::Queue

included

Constructor Details

#initialize(heartbeat_interval_seconds:, metrics_aggregation_interval_seconds:, emitter:, metrics_manager:, dependency_collection:, logger:, enabled: true, shutdown_timeout: Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT, buffer_size: DEFAULT_BUFFER_MAX_SIZE) ⇒ Worker



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/datadog/core/telemetry/worker.rb', line 22

def initialize(
  heartbeat_interval_seconds:,
  metrics_aggregation_interval_seconds:,
  emitter:,
  metrics_manager:,
  dependency_collection:,
  logger:,
  enabled: true,
  shutdown_timeout: Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT,
  buffer_size: DEFAULT_BUFFER_MAX_SIZE
)
  @emitter = emitter
  @metrics_manager = metrics_manager
  @dependency_collection = dependency_collection
  @logger = logger

  @ticks_per_heartbeat = (heartbeat_interval_seconds / metrics_aggregation_interval_seconds).to_i
  @current_ticks = 0

  # Workers::Polling settings
  self.enabled = enabled
  # Workers::IntervalLoop settings
  self.loop_base_interval = metrics_aggregation_interval_seconds
  self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_STOP

  @shutdown_timeout = shutdown_timeout
  @buffer_size = buffer_size

  self.buffer = buffer_klass.new(@buffer_size)
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



53
54
55
# File 'lib/datadog/core/telemetry/worker.rb', line 53

def logger
  @logger
end

Instance Method Details

#enqueue(event) ⇒ Object



68
69
70
71
72
# File 'lib/datadog/core/telemetry/worker.rb', line 68

def enqueue(event)
  return if !enabled? || forked?

  buffer.push(event)
end

#failed_to_start?Boolean



78
79
80
# File 'lib/datadog/core/telemetry/worker.rb', line 78

def failed_to_start?
  TELEMETRY_STARTED_ONCE.failed?
end

#sent_started_event?Boolean



74
75
76
# File 'lib/datadog/core/telemetry/worker.rb', line 74

def sent_started_event?
  TELEMETRY_STARTED_ONCE.success?
end

#startObject



55
56
57
58
59
60
# File 'lib/datadog/core/telemetry/worker.rb', line 55

def start
  return if !enabled? || forked?

  # starts async worker
  perform
end

#stop(force_stop = false, timeout = @shutdown_timeout) ⇒ Object



62
63
64
65
66
# File 'lib/datadog/core/telemetry/worker.rb', line 62

def stop(force_stop = false, timeout = @shutdown_timeout)
  buffer.close if running?

  super
end