Class: Datadog::OpenFeature::Exposures::Worker
Overview
This class is responsible for sending exposures to the Agent
Constant Summary
collapse
5
- GRACEFUL_SHUTDOWN_WAIT_INTERVAL_SECONDS =
0.5
- DEFAULT_FLUSH_INTERVAL_SECONDS =
30
- DEFAULT_BUFFER_LIMIT =
Buffer::DEFAULT_LIMIT
Core::Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT
Instance Attribute Summary
#buffer
Instance Method Summary
collapse
#enabled=, #enabled?, included
included, #work_pending?
Constructor Details
#initialize(settings:, transport:, telemetry:, logger:, flush_interval_seconds: DEFAULT_FLUSH_INTERVAL_SECONDS, buffer_limit: DEFAULT_BUFFER_LIMIT) ⇒ Worker
Returns a new instance of Worker.
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
# File 'lib/datadog/open_feature/exposures/worker.rb', line 24
def initialize(
settings:,
transport:,
telemetry:,
logger:,
flush_interval_seconds: DEFAULT_FLUSH_INTERVAL_SECONDS,
buffer_limit: DEFAULT_BUFFER_LIMIT
)
@logger = logger
@transport = transport
@telemetry = telemetry
@batch_builder = BatchBuilder.new(settings)
@buffer_limit = buffer_limit
self.buffer = Buffer.new(buffer_limit)
self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_RESTART
self.loop_base_interval = flush_interval_seconds
self.enabled = true
end
|
Instance Method Details
#dequeue ⇒ Object
63
64
65
|
# File 'lib/datadog/open_feature/exposures/worker.rb', line 63
def dequeue
[buffer.pop, buffer.dropped_count]
end
|
#enqueue(event) ⇒ Object
56
57
58
59
60
61
|
# File 'lib/datadog/open_feature/exposures/worker.rb', line 56
def enqueue(event)
buffer.push(event)
start unless running?
true
end
|
#graceful_shutdown ⇒ Object
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
# File 'lib/datadog/open_feature/exposures/worker.rb', line 72
def graceful_shutdown
return false unless enabled? || !run_loop?
self.enabled = false
started = Core::Utils::Time.get_time
wait_time = loop_base_interval +
loop do
break if buffer.empty? && !in_iteration?
sleep(GRACEFUL_SHUTDOWN_WAIT_INTERVAL_SECONDS)
break if Core::Utils::Time.get_time - started > wait_time
end
stop(true)
end
|
67
68
69
70
|
# File 'lib/datadog/open_feature/exposures/worker.rb', line 67
def perform(*args)
events, dropped = args
send_events(Array(events), dropped.to_i)
end
|
#start ⇒ Object
44
45
46
47
48
|
# File 'lib/datadog/open_feature/exposures/worker.rb', line 44
def start
return if !enabled? || running?
perform
end
|
#stop(force_stop = false, timeout = Core::Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT) ⇒ Object
50
51
52
53
54
|
# File 'lib/datadog/open_feature/exposures/worker.rb', line 50
def stop(force_stop = false, timeout = Core::Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT)
buffer.close if running?
super
end
|