Class: Datadog::Tracing::Workers::AsyncTraceWriter

Inherits:
TraceWriter show all
Includes:
Core::Workers::Polling, Core::Workers::Queue
Defined in:
lib/datadog/tracing/workers/trace_writer.rb

Overview

Writes traces to transport asynchronously, using a thread & buffer.

Constant Summary collapse

DEFAULT_BUFFER_MAX_SIZE =
1000
FORK_POLICY_ASYNC =
:async
FORK_POLICY_SYNC =
:sync

Constants included from Core::Workers::Polling

Core::Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT

Instance Attribute Summary collapse

Attributes included from Core::Workers::Queue

#buffer

Attributes inherited from TraceWriter

#agent_settings, #logger, #transport

Attributes inherited from Core::Worker

#task

Instance Method Summary collapse

Methods included from Core::Workers::Polling

#enabled=, #enabled?, included

Methods included from Core::Workers::Queue

included

Methods inherited from TraceWriter

#flush_completed, #flush_traces, #process_traces, #write_traces

Constructor Details

#initialize(options = {}) ⇒ AsyncTraceWriter

Returns a new instance of AsyncTraceWriter.



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/datadog/tracing/workers/trace_writer.rb', line 93

def initialize(options = {})
  # Workers::TraceWriter settings
  super

  # Workers::Polling settings
  self.enabled = options.fetch(:enabled, true)

  # Workers::Async::Thread settings
  @async = true
  self.fork_policy = options.fetch(:fork_policy, FORK_POLICY_ASYNC)

  # Workers::IntervalLoop settings
  self.loop_base_interval = options[:interval] if options.key?(:interval)
  self.loop_back_off_ratio = options[:back_off_ratio] if options.key?(:back_off_ratio)
  self.loop_back_off_max = options[:back_off_max] if options.key?(:back_off_max)

  # Workers::Queue settings
  @buffer_size = options.fetch(:buffer_size, DEFAULT_BUFFER_MAX_SIZE)
  self.buffer = TraceBuffer.new(@buffer_size)

  @shutdown_timeout = options.fetch(:shutdown_timeout, Core::Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT)
end

Instance Attribute Details

#async=(value) ⇒ Object (writeonly)

Sets the attribute async

Parameters:

  • value

    the value to set the attribute async to.



90
91
92
# File 'lib/datadog/tracing/workers/trace_writer.rb', line 90

def async=(value)
  @async = value
end

Instance Method Details

#after_forkObject



173
174
175
176
177
178
179
180
181
182
# File 'lib/datadog/tracing/workers/trace_writer.rb', line 173

def after_fork
  # In multiprocess environments, forks will share the same buffer until its written to.
  # A.K.A. copy-on-write. We don't want forks to write traces generated from another process.
  # Instead, we reset it after the fork. (Make sure any enqueue operations happen after this.)
  self.buffer = TraceBuffer.new(@buffer_size)

  # Switch to synchronous mode if configured to do so.
  # In some cases synchronous writing is preferred because the fork will be short lived.
  @async = false if @writer_fork_policy == FORK_POLICY_SYNC
end

#async?Boolean

Returns:

  • (Boolean)


149
150
151
# File 'lib/datadog/tracing/workers/trace_writer.rb', line 149

def async?
  @async == true
end

#dequeueObject



138
139
140
141
142
# File 'lib/datadog/tracing/workers/trace_writer.rb', line 138

def dequeue
  # Wrap results in Array because they are
  # splatted as args against TraceWriter#perform.
  [buffer.pop]
end

#enqueue(trace) ⇒ Object



134
135
136
# File 'lib/datadog/tracing/workers/trace_writer.rb', line 134

def enqueue(trace)
  buffer.push(trace)
end

#fork_policy=(policy) ⇒ Object



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/datadog/tracing/workers/trace_writer.rb', line 153

def fork_policy=(policy)
  # Translate to Workers::Async::Thread policy
  thread_fork_policy =  case policy
                        when Core::Workers::Async::Thread::FORK_POLICY_STOP
                          policy
                        when FORK_POLICY_SYNC
                          # Stop the async thread because the writer
                          # will bypass and run synchronously.
                          Core::Workers::Async::Thread::FORK_POLICY_STOP
                        else
                          Core::Workers::Async::Thread::FORK_POLICY_RESTART
                        end

  # Update thread fork policy
  super(thread_fork_policy)

  # Update local policy
  @writer_fork_policy = policy
end

#perform(traces) ⇒ Object

NOTE: #perform is wrapped by other modules:

Polling --> Async --> IntervalLoop --> AsyncTraceWriter --> TraceWriter

WARNING: This method breaks the Liskov Substitution Principle – TraceWriter#perform is spec’d to return the result from the writer, whereas this method always returns nil.



121
122
123
124
125
126
127
# File 'lib/datadog/tracing/workers/trace_writer.rb', line 121

def perform(traces)
  super(traces).tap do |responses|
    loop_back_off! if responses.find(&:server_error?)
  end

  nil
end

#stop(force_stop = false, timeout = @shutdown_timeout) ⇒ Object



129
130
131
132
# File 'lib/datadog/tracing/workers/trace_writer.rb', line 129

def stop(force_stop = false, timeout = @shutdown_timeout)
  buffer.close if running?
  super
end

#work_pending?Boolean

Are there more traces to be processed next?

Returns:

  • (Boolean)


145
146
147
# File 'lib/datadog/tracing/workers/trace_writer.rb', line 145

def work_pending?
  !buffer.empty?
end

#write(trace) ⇒ Object

WARNING: This method breaks the Liskov Substitution Principle – TraceWriter#write is spec’d to return the result from the writer, whereas this method returns something else when running in async mode.



186
187
188
189
190
191
192
193
194
195
# File 'lib/datadog/tracing/workers/trace_writer.rb', line 186

def write(trace)
  # Start worker thread. If the process has forked, it will trigger #after_fork to
  # reconfigure the worker accordingly.
  # NOTE: It's important we do this before queuing or it will drop the current trace,
  #       because #after_fork resets the buffer.
  perform

  # Queue the trace if running asynchronously, otherwise short-circuit and write it directly.
  async? ? enqueue(trace) : write_traces([trace])
end