Class: Datadog::OpenFeature::Exposures::Worker

Inherits:
Object
  • Object
show all
Includes:
Core::Workers::Polling, Core::Workers::Queue
Defined in:
lib/datadog/open_feature/exposures/worker.rb

Overview

This class is responsible for sending exposures to the Agent

Constant Summary collapse

GRACEFUL_SHUTDOWN_EXTRA_SECONDS =
5
GRACEFUL_SHUTDOWN_WAIT_INTERVAL_SECONDS =
0.5
DEFAULT_FLUSH_INTERVAL_SECONDS =
30
DEFAULT_BUFFER_LIMIT =
Buffer::DEFAULT_LIMIT

Constants included from Core::Workers::Polling

Core::Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT

Instance Attribute Summary

Attributes included from Core::Workers::Queue

#buffer

Instance Method Summary collapse

Methods included from Core::Workers::Polling

#enabled=, #enabled?, included

Methods included from Core::Workers::Queue

included, #work_pending?

Constructor Details

#initialize(settings:, transport:, telemetry:, logger:, flush_interval_seconds: DEFAULT_FLUSH_INTERVAL_SECONDS, buffer_limit: DEFAULT_BUFFER_LIMIT) ⇒ Worker

Returns a new instance of Worker.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/datadog/open_feature/exposures/worker.rb', line 24

def initialize(
  settings:,
  transport:,
  telemetry:,
  logger:,
  flush_interval_seconds: DEFAULT_FLUSH_INTERVAL_SECONDS,
  buffer_limit: DEFAULT_BUFFER_LIMIT
)
  @logger = logger
  @transport = transport
  @telemetry = telemetry
  @batch_builder = BatchBuilder.new(settings)
  @buffer_limit = buffer_limit

  self.buffer = Buffer.new(buffer_limit)
  self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_RESTART
  self.loop_base_interval = flush_interval_seconds
  self.enabled = true
end

Instance Method Details

#dequeueObject



63
64
65
# File 'lib/datadog/open_feature/exposures/worker.rb', line 63

def dequeue
  [buffer.pop, buffer.dropped_count]
end

#enqueue(event) ⇒ Object



56
57
58
59
60
61
# File 'lib/datadog/open_feature/exposures/worker.rb', line 56

def enqueue(event)
  buffer.push(event)
  start unless running?

  true
end

#graceful_shutdownObject



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/datadog/open_feature/exposures/worker.rb', line 72

def graceful_shutdown
  return false unless enabled? || !run_loop?

  self.enabled = false

  started = Core::Utils::Time.get_time
  wait_time = loop_base_interval + GRACEFUL_SHUTDOWN_EXTRA_SECONDS

  loop do
    break if buffer.empty? && !in_iteration?

    sleep(GRACEFUL_SHUTDOWN_WAIT_INTERVAL_SECONDS)
    break if Core::Utils::Time.get_time - started > wait_time
  end

  stop(true)
end

#perform(*args) ⇒ Object



67
68
69
70
# File 'lib/datadog/open_feature/exposures/worker.rb', line 67

def perform(*args)
  events, dropped = args
  send_events(Array(events), dropped.to_i)
end

#startObject



44
45
46
47
48
# File 'lib/datadog/open_feature/exposures/worker.rb', line 44

def start
  return if !enabled? || running?

  perform
end

#stop(force_stop = false, timeout = Core::Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT) ⇒ Object



50
51
52
53
54
# File 'lib/datadog/open_feature/exposures/worker.rb', line 50

def stop(force_stop = false, timeout = Core::Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT)
  buffer.close if running?

  super
end