Class: Datadog::Core::Telemetry::Worker

Inherits:
Object
  • Object
show all
Includes:
Workers::Polling, Workers::Queue
Defined in:
lib/datadog/core/telemetry/worker.rb

Overview

Accumulates events and sends them to the API at a regular interval, including heartbeat event.

Constant Summary collapse

DEFAULT_BUFFER_MAX_SIZE =
1000
APP_STARTED_EVENT_RETRIES =
10
TELEMETRY_STARTED_ONCE =
Utils::OnlyOnceSuccessful.new(APP_STARTED_EVENT_RETRIES)

Constants included from Workers::Polling

Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT

Instance Attribute Summary collapse

Attributes included from Workers::Queue

#buffer

Instance Method Summary collapse

Methods included from Workers::Polling

#enabled=, #enabled?, included

Methods included from Workers::Queue

included

Constructor Details

#initialize(heartbeat_interval_seconds:, metrics_aggregation_interval_seconds:, emitter:, metrics_manager:, dependency_collection:, logger:, enabled: true, shutdown_timeout: Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT, buffer_size: DEFAULT_BUFFER_MAX_SIZE) ⇒ Worker

Returns a new instance of Worker.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/datadog/core/telemetry/worker.rb', line 22

def initialize(
  heartbeat_interval_seconds:,
  metrics_aggregation_interval_seconds:,
  emitter:,
  metrics_manager:,
  dependency_collection:,
  logger:,
  enabled: true,
  shutdown_timeout: Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT,
  buffer_size: DEFAULT_BUFFER_MAX_SIZE
)
  @emitter = emitter
  @metrics_manager = metrics_manager
  @dependency_collection = dependency_collection
  @logger = logger

  @ticks_per_heartbeat = (heartbeat_interval_seconds / metrics_aggregation_interval_seconds).to_i
  @current_ticks = 0

  # Workers::Polling settings
  self.enabled = enabled
  # Workers::IntervalLoop settings
  self.loop_base_interval = metrics_aggregation_interval_seconds
  self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_STOP

  @shutdown_timeout = shutdown_timeout
  @buffer_size = buffer_size

  self.buffer = buffer_klass.new(@buffer_size)
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



53
54
55
# File 'lib/datadog/core/telemetry/worker.rb', line 53

def logger
  @logger
end

Instance Method Details

#enqueue(event) ⇒ Object



68
69
70
71
72
# File 'lib/datadog/core/telemetry/worker.rb', line 68

def enqueue(event)
  return if !enabled? || forked?

  buffer.push(event)
end

#failed_to_start?Boolean

Returns:

  • (Boolean)


78
79
80
# File 'lib/datadog/core/telemetry/worker.rb', line 78

def failed_to_start?
  TELEMETRY_STARTED_ONCE.failed?
end

#sent_started_event?Boolean

Returns:

  • (Boolean)


74
75
76
# File 'lib/datadog/core/telemetry/worker.rb', line 74

def sent_started_event?
  TELEMETRY_STARTED_ONCE.success?
end

#startObject



55
56
57
58
59
60
# File 'lib/datadog/core/telemetry/worker.rb', line 55

def start
  return if !enabled? || forked?

  # starts async worker
  perform
end

#stop(force_stop = false, timeout = @shutdown_timeout) ⇒ Object



62
63
64
65
66
# File 'lib/datadog/core/telemetry/worker.rb', line 62

def stop(force_stop = false, timeout = @shutdown_timeout)
  buffer.close if running?

  super
end