Class: Datadog::Statsd::Sender
- Inherits:
-
Object
- Object
- Datadog::Statsd::Sender
- Defined in:
- lib/datadog/statsd/sender.rb
Overview
Sender is using a companion thread to flush and pack messages in a ‘MessageBuffer`. The communication with this thread is done using a `Queue`. If the thread is dead, it is starting a new one to avoid having a blocked Sender with no companion thread to communicate with (most of the time, having a dead companion thread means that a fork just happened and that we are running in the child process).
Constant Summary collapse
- CLOSEABLE_QUEUES =
Queue.instance_methods.include?(:close)
Instance Method Summary collapse
- #add(message) ⇒ Object
- #flush(sync: false) ⇒ Object
-
#initialize(message_buffer, telemetry: nil, queue_size: UDP_DEFAULT_BUFFER_SIZE, logger: nil, flush_interval: nil, queue_class: Queue, thread_class: Thread) ⇒ Sender
constructor
A new instance of Sender.
- #rendez_vous ⇒ Object
- #start ⇒ Object
-
#stop(join_worker: true) ⇒ Object
when calling stop, make sure that no other threads is trying to close the sender nor trying to continue to ‘#add` more message into the sender.
Constructor Details
#initialize(message_buffer, telemetry: nil, queue_size: UDP_DEFAULT_BUFFER_SIZE, logger: nil, flush_interval: nil, queue_class: Queue, thread_class: Thread) ⇒ Sender
Returns a new instance of Sender.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/datadog/statsd/sender.rb', line 15 def initialize(, telemetry: nil, queue_size: UDP_DEFAULT_BUFFER_SIZE, logger: nil, flush_interval: nil, queue_class: Queue, thread_class: Thread) @message_buffer = @telemetry = telemetry @queue_size = queue_size @logger = logger @mx = Mutex.new @queue_class = queue_class @thread_class = thread_class @done = false @flush_timer = if flush_interval Datadog::Statsd::Timer.new(flush_interval) { flush(sync: true) } else nil end end |
Instance Method Details
#add(message) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/datadog/statsd/sender.rb', line 64 def add() raise ArgumentError, 'Start sender first' unless # if the thread does not exist, we assume we are running in a forked process, # empty the message queue and message buffers (these messages belong to # the parent process) and spawn a new companion thread. if sender_thread.nil? || !sender_thread.alive? @mx.synchronize { # an attempt was previously made to start the sender thread but failed. # skipping re-start return if @done # a call from another thread has already re-created # the companion thread before this one acquired the lock break if sender_thread.alive? @logger.debug { "Statsd: companion thread is dead, re-creating one" } if @logger .close if CLOSEABLE_QUEUES @message_queue = nil .reset start @flush_timer.start if @flush_timer && @flush_timer.stop? } end if .length <= @queue_size << else if @telemetry bytesize = .respond_to?(:bytesize) ? .bytesize : 0 @telemetry.dropped_queue(packets: 1, bytes: bytesize) end end end |
#flush(sync: false) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/datadog/statsd/sender.rb', line 31 def flush(sync: false) # keep a copy around in case another thread is calling #stop while this method is running = # don't try to flush if there is no message_queue instantiated or # no companion thread running if ! @logger.debug { "Statsd: can't flush: no message queue ready" } if @logger return end if !sender_thread.alive? @logger.debug { "Statsd: can't flush: no sender_thread alive" } if @logger return end .push(:flush) rendez_vous if sync end |
#rendez_vous ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/datadog/statsd/sender.rb', line 50 def rendez_vous # could happen if #start hasn't be called return unless # Initialize and get the thread's sync queue queue = (@thread_class.current[:statsd_sync_queue] ||= @queue_class.new) # tell sender-thread to notify us in the current # thread's queue .push(queue) # wait for the sender thread to send a message # once the flush is done queue.pop end |
#start ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/datadog/statsd/sender.rb', line 98 def start raise ArgumentError, 'Sender already started' if # initialize a new message queue for the background thread @message_queue = @queue_class.new begin # start background thread @sender_thread = @thread_class.new(&method(:send_loop)) @sender_thread.name = "Statsd Sender" unless Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.3') rescue ThreadError => e @logger.debug { "Statsd: Failed to start sender thread: #{e.}" } if @logger @mx.synchronize { @done = true } end @flush_timer.start if @flush_timer end |
#stop(join_worker: true) ⇒ Object
when calling stop, make sure that no other threads is trying to close the sender nor trying to continue to ‘#add` more message into the sender.
119 120 121 122 123 124 125 126 127 |
# File 'lib/datadog/statsd/sender.rb', line 119 def stop(join_worker: true) @flush_timer.stop if @flush_timer = @message_queue .close if sender_thread = @sender_thread sender_thread.join if sender_thread && join_worker end |