Class: Trace::ZipkinKafkaSender

Inherits:
ZipkinSenderBase show all
Defined in:
lib/zipkin-tracer/zipkin_kafka_sender.rb

Overview

This class sends information to Zipkin through Kafka. Spans are encoded using Thrift

Constant Summary collapse

DEFAULT_KAFKA_TOPIC =
"zipkin".freeze
IP_FORMAT =
:i32

Instance Method Summary collapse

Methods inherited from ZipkinSenderBase

#end_span, #skip_flush?, #start_span, #with_new_span

Constructor Details

#initialize(options = {}) ⇒ ZipkinKafkaSender

Returns a new instance of ZipkinKafkaSender.



19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/zipkin-tracer/zipkin_kafka_sender.rb', line 19

def initialize(options = {})
  @topic  = options[:topic] || DEFAULT_KAFKA_TOPIC

  if options[:producer] && options[:producer].respond_to?(:push)
    @producer = options[:producer]
  elsif options[:zookeepers]
    initialize_hermann_producer(options[:zookeepers])
  else
    raise ArgumentError, "No (kafka) :producer option (accepting #push) and no :zookeeper option provided."
  end
  super(options)
end

Instance Method Details

#flush!Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/zipkin-tracer/zipkin_kafka_sender.rb', line 32

def flush!
  resolved_spans = ::ZipkinTracer::HostnameResolver.new.spans_with_ips(spans, IP_FORMAT)
  resolved_spans.each do |span|
    buf = ''
    trans = Thrift::MemoryBufferTransport.new(buf)
    oprot = Thrift::BinaryProtocol.new(trans)
    span.to_thrift.write(oprot)
    retval = @producer.push(buf, topic: @topic)

    # If @producer#push returns a promise/promise-like object, block until it
    # resolves
    retval.value! if retval.respond_to?(:value!)

    retval
  end
rescue Exception
  # Ignore socket errors, etc
end